You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/12 21:11:32 UTC

svn commit: r922412 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/tests/ python/qpid/

Author: aconway
Date: Fri Mar 12 20:11:31 2010
New Revision: 922412

URL: http://svn.apache.org/viewvc?rev=922412&view=rev
Log:
New cluster member pushes store when joining an active cluster.

Previously a broker with a clean store would not be able to join an
active cluster because the shtudown-id did not match. This commit
ensures that when a broker joins an active cluster, it always pushes
its store regardless of status. Clean/dirty status is only compared
when forming an initial cluster.

This change required splitting initialization into two phases:

PRE_INIT: occurs in the Cluster ctor during early-initialize. This
phase determines whether or not to push the store.

INIT: occurs after Cluster::initialize and does the remaining
initialization chores.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
    qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Fri Mar 12 20:11:31 2010
@@ -79,7 +79,7 @@ void IncompleteMessageList::each(const C
         sys::Mutex::ScopedLock l(lock);
         snapshot = incomplete;
     }
-    std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value?
+    std::for_each(incomplete.begin(), incomplete.end(), listen);
 }
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Mar 12 20:11:31 2010
@@ -16,7 +16,8 @@
  *
  */
 
-/** CLUSTER IMPLEMENTATION OVERVIEW
+/**
+ * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1>
  *
  * The cluster works on the principle that if all members of the
  * cluster receive identical input, they will all produce identical
@@ -41,12 +42,15 @@
  *  
  * The following are the current areas where broker uses timers or timestamps:
  * 
- * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput.
- *   a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState
+ * - Producer flow control: broker::SemanticState uses
+ *   connection::getClusterOrderOutput.  a FrameHandler that sends
+ *   frames to the client via the cluster. Used by broker::SessionState
  *   
- * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy.
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
+ *   implemented by cluster::ExpiryPolicy.
  * 
- * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore.
+ * - Connection heartbeat: sends connection controls, not part of
+ *   session command counting so OK to ignore.
  * 
  * - LinkRegistry: only cluster elder is ever active for links.
  * 
@@ -57,7 +61,10 @@
  *
  * cluster::ExpiryPolicy implements the strategy for message expiry.
  *
- * CLUSTER PROTOCOL OVERVIEW
+ * ClusterTimer implements periodic timed events in the cluster context.
+ * Used for periodic management events.
+ *
+ * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
  * 
  * Messages sent to/from CPG are called Events.
  *
@@ -84,12 +91,16 @@
  *  - Connection control events carrying non-cluster frames: frames sent to the client.
  *    e.g. flow-control frames generated on a timer.
  *
- * CLUSTER INITIALIZATION OVERVIEW
+ * <h1>CLUSTER INITIALIZATION OVERVIEW</h1>
+ *
+ * @see InitialStatusMap
  *
  * When a new member joins the CPG group, all members (including the
  * new one) multicast their "initial status." The new member is in
- * INIT mode until it gets a complete set of initial status messages
- * from all cluster members.
+ * PRE_INIT mode until it gets a complete set of initial status
+ * messages from all cluster members. In a newly-forming cluster is
+ * then in INIT mode until the configured cluster-size members have
+ * joined.
  *
  * The newcomer uses initial status to determine
  *  - The cluster UUID
@@ -97,11 +108,16 @@
  *  - Do I need to get an update from an existing active member?
  *  - Can I recover from my own store?
  *
- * Initialization happens in the Cluster constructor (plugin
- * early-init phase) because it needs to be done before the store
- * initializes. In INIT mode sending & receiving from the cluster are
- * done single-threaded, bypassing the normal PollableQueues because
- * the Poller is not active at this point to service them.
+ * Pre-initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to set the recovery flag before
+ * the store initializes. This phase lasts until inital-status is
+ * received for all active members. The PollableQueues and Multicaster
+ * are in "bypass" mode during this phase since the poller has not
+ * started so there are no threads to serve pollable queues.
+ *
+ * The remaining initialization happens in Cluster::initialize() or,
+ * if cluster-size=N is specified, in the deliver thread when an
+ * initial-status control is delivered that brings the total to N.
  */
 #include "qpid/Exception.h"
 #include "qpid/cluster/Cluster.h"
@@ -244,7 +260,7 @@ Cluster::Cluster(const ClusterSettings& 
     quorum(boost::bind(&Cluster::leave, this)),
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
-    state(INIT),
+    state(PRE_INIT),
     initMap(self, settings.size),
     store(broker.getDataDir().getPath()),
     elder(false),
@@ -274,17 +290,18 @@ Cluster::Cluster(const ClusterSettings& 
     // without modifying delivery-properties.exchange.
     broker.getExchanges().registerExchange(
         boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
-        if (store.getState() == STORE_STATE_DIRTY_STORE)
-            broker.setRecovery(false); // Ditch my current store.
         if (store.getClusterId())
             clusterId = store.getClusterId(); // Use stored ID if there is one.
         QPID_LOG(notice, "Cluster store state: " << store)
     }
-
     cpg.join(name);
+    // pump the CPG dispatch manually till we get past PRE_INIT.
+    while (state == PRE_INIT)
+        cpg.dispatchOne();
 }
 
 Cluster::~Cluster() {
@@ -301,9 +318,14 @@ void Cluster::initialize() {
     dispatcher.start();
     deliverEventQueue.start();
     deliverFrameQueue.start();
+    mcast.start();
+
+    // Run initMapCompleted immediately to process the initial configuration.
+    assert(state == INIT);
+    initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
 
     // Add finalizer last for exception safety.
-    broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); 
+    broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
 }
 
 // Called in connection thread to insert a client connection.
@@ -579,9 +601,27 @@ void Cluster::setReady(Lock&) {
 void Cluster::initMapCompleted(Lock& l) {
     // Called on completion of the initial status map. 
     QPID_LOG(debug, *this << " initial status map complete. ");
-    if (state == INIT) {
-        // We have status for all members so we can make join descisions.
+    if (state == PRE_INIT) {
+        // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
+        // We decide here whether we want to recover from our store.
+        // We won't recover if we are joining an active cluster or our store is dirty.
+        if (store.hasStore() &&
+            (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
+            broker.setRecovery(false); // Ditch my current store.
+        state = INIT;
+    }
+    else if (state == INIT) {
+        // INIT means we are past Cluster::initialize().
+
+        // If we're forming an initial cluster (no active members)
+        // then we wait to reach the configured cluster-size
+        if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) {
+            QPID_LOG(info, *this << initMap.getActualSize()
+                     << " members, waiting for at least " << initMap.getRequiredSize());
+            return;
+        }
         initMap.checkConsistent();
+
         elders = initMap.getElders();
         QPID_LOG(debug, *this << " elders: " << elders);
         if (elders.empty())
@@ -969,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) {
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
     static const char* STATE[] = {
-        "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+        "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
+        "READY", "OFFER", "UPDATER", "LEFT"
     };
     assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
     o << "cluster(" << cluster.self << " " << STATE[cluster.state];
@@ -1009,12 +1050,14 @@ void Cluster::errorCheck(const MemberId&
 }
 
 void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
-    timer->deliverWakeup(name);
+    if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+        timer->deliverWakeup(name);
 }
 
 void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
     QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
-    timer->deliverDrop(name);
+    if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+        timer->deliverDrop(name);
 }
 
 bool Cluster::isElder() const {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Mar 12 20:11:31 2010
@@ -180,6 +180,7 @@ class Cluster : private Cpg::Handler, pu
     void memberUpdate(Lock&);
     void setClusterId(const framing::Uuid&, Lock&);
     void erase(const ConnectionId&, Lock&);       
+    void requestUpdate(Lock& );
     void initMapCompleted(Lock&);
     void becomeElder(Lock&);
 
@@ -251,7 +252,8 @@ class Cluster : private Cpg::Handler, pu
 
     //    Local cluster state, cluster map
     enum {
-        INIT,    ///< Establishing inital cluster stattus.
+        PRE_INIT,///< Have not yet received complete initial status map.
+        INIT,    ///< Waiting to reach cluster-size.
         JOINER,  ///< Sent update request, waiting for update offer.
         UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
         CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Fri Mar 12 20:11:31 2010
@@ -86,8 +86,7 @@ bool InitialStatusMap::notInitialized(co
 }
 
 bool InitialStatusMap::isComplete() const {
-    return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end()
-        && (map.size() >= size);
+    return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end();
 }
 
 bool InitialStatusMap::transitionToComplete() {
@@ -100,7 +99,7 @@ bool InitialStatusMap::isResendNeeded() 
     return ret;
 }
 
-bool InitialStatusMap::isActive(const Map::value_type& v) {
+bool InitialStatusMap::isActiveEntry(const Map::value_type& v) {
     return v.second && v.second->getActive();
 }
 
@@ -110,10 +109,15 @@ bool InitialStatusMap::hasStore(const Ma
          v.second->getStoreState() == STORE_STATE_DIRTY_STORE);
 }
 
+bool InitialStatusMap::isActive() {
+    assert(isComplete());
+    return (find_if(map.begin(), map.end(), &isActiveEntry) != map.end());
+}
+
 bool InitialStatusMap::isUpdateNeeded() {
     assert(isComplete());
     // We need an update if there are any active members.
-    if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
+    if (isActive()) return true;
 
     // Otherwise it depends on store status, get my own status:
     Map::iterator me = map.find(self);
@@ -154,7 +158,7 @@ MemberSet InitialStatusMap::getElders() 
 Uuid InitialStatusMap::getClusterId() {
     assert(isComplete());
     assert(!map.empty());
-    Map::iterator i = find_if(map.begin(), map.end(), &isActive);
+    Map::iterator i = find_if(map.begin(), map.end(), &isActiveEntry);
     if (i != map.end())
         return i->second->getClusterId(); // An active member
     else
@@ -178,6 +182,7 @@ void InitialStatusMap::checkConsistent()
     Uuid clusterId;
     Uuid shutdownId;
 
+    bool initialCluster = !isActive();
     for (Map::iterator i = map.begin(); i != map.end(); ++i) {
         assert(i->second);
         if (i->second->getActive()) ++active;
@@ -193,8 +198,10 @@ void InitialStatusMap::checkConsistent()
             ++clean;
             checkId(clusterId, i->second->getClusterId(),
                     "Cluster-ID mismatch. Stores belong to different clusters.");
-            checkId(shutdownId, i->second->getShutdownId(),
-                    "Shutdown-ID mismatch. Stores were not shut down together");
+            // Only need shutdownId to match if we are in an initially forming cluster.
+            if (initialCluster)
+                checkId(shutdownId, i->second->getShutdownId(),
+                        "Shutdown-ID mismatch. Stores were not shut down together");
             break;
         }
     }
@@ -202,10 +209,13 @@ void InitialStatusMap::checkConsistent()
     if (none && (clean+dirty+empty))
         throw Exception("Mixing transient and persistent brokers in a cluster");
 
-    // If there are no active members and there are dirty stores there
-    // must be at least one clean store.
-    if (!active && dirty && !clean)
-        throw Exception("Cannot recover, no clean store.");
+    if (map.size() >= size) {
+        // All initial members are present. If there are no active
+        // members and there are dirty stores there must be at least
+        // one clean store.
+        if (!active && dirty && !clean)
+            throw Exception("Cannot recover, no clean store.");
+    }
 }
 
 std::string InitialStatusMap::getFirstConfigStr() const {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Fri Mar 12 20:11:31 2010
@@ -51,12 +51,18 @@ class InitialStatusMap
     /** Process received status */
     void received(const MemberId&, const Status& is);
 
-    /**@return true if the map is complete. */
+    /**@return true if the map has an entry for all current cluster members. */
     bool isComplete() const;
+
+    size_t getActualSize() const { return map.size(); }
+    size_t getRequiredSize() const { return size; }
+
     /**@return true if the map was completed by the last config change or received. */
     bool transitionToComplete();
     /**@pre isComplete(). @return this node's elders */
     MemberSet getElders() const;
+    /**@pre isComplete(). @return True if there are active members of the cluster. */
+    bool isActive();
     /**@pre isComplete(). @return True if we need to request an update. */
     bool isUpdateNeeded();
     /**@pre isComplete(). @return Cluster-wide cluster ID. */
@@ -71,8 +77,9 @@ class InitialStatusMap
   private:
     typedef std::map<MemberId, boost::optional<Status> > Map;
     static bool notInitialized(const Map::value_type&);
-    static bool isActive(const Map::value_type&);
+    static bool isActiveEntry(const Map::value_type&);
     static bool hasStore(const Map::value_type&);
+
     Map map;
     MemberSet firstConfig;
     MemberId self;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Fri Mar 12 20:11:31 2010
@@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_, 
                          boost::function<void()> onError_) :
     onError(onError_), cpg(cpg_), 
     queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
-    ready(false)
-{
-    queue.start();
-}
+    ready(false), bypass(true)
+{}
 
 void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
     mcast(Event::control(body, id));
@@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e) 
         }
     }
     QPID_LOG(trace, "MCAST " << e);
-    queue.push(e);
+    if (bypass) {               // direct, don't queue
+        iovec iov = e.toIovec();
+        // FIXME aconway 2010-03-10: should do limited retry.
+        while (!cpg.mcast(&iov, 1))
+            ;
+    }
+    else
+        queue.push(e);
 }
 
-
 Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
     try {
         PollableEventQueue::Batch::const_iterator i = values.begin();
@@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::
     }
 }
 
+void Multicaster::start() {
+    queue.start();
+    bypass = false;
+}
+
 void Multicaster::setReady() {
     sys::Mutex::ScopedLock l(lock);
     ready = true;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Fri Mar 12 20:11:31 2010
@@ -41,16 +41,18 @@ class Cpg;
 
 /**
  * Multicast to the cluster. Shared, thread safe object.
- *
- * Runs in two modes;
  * 
- * initializing: Hold connection mcast events. Multicast cluster
- * events directly in the calling thread. This mode is used before
- * joining the cluster where the poller may not yet be active and we
- * want to hold any connection traffic till we join.
+ * holding mode: Hold connection events for later multicast. Cluster
+ * events are never held.  Used during PRE_INIT/INIT state when we
+ * want to hold any connection traffic till we are read in the
+ * cluster.
+ *
+ * bypass mode: Multicast cluster events directly in the calling
+ * thread. This mode is used by cluster in PRE_INIT state the poller
+ * is not yet be active.
  *
- * ready: normal operation. Queues all mcasts on a pollable queue,
- * multicasts connection and cluster events.
+ * Multicaster is created in bypass+holding mode, they are disabled by
+ * start and setReady respectively.
  */
 class Multicaster
 {
@@ -65,7 +67,9 @@ class Multicaster
     void mcastBuffer(const char*, size_t, const ConnectionId&);
     void mcast(const Event& e);
 
-    /** Switch to ready mode. */
+    /** Start the pollable queue, turn off bypass mode. */
+    void start();
+    /** Switch to ready mode, release held messages. */
     void setReady();
 
   private:
@@ -81,6 +85,7 @@ class Multicaster
     bool ready;
     PlainEventQueue holdingQueue;
     std::vector<struct ::iovec> ioVector;
+    bool bypass;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Fri Mar 12 20:11:31 2010
@@ -31,6 +31,13 @@ namespace cluster {
 /**
  * More convenient version of PollableQueue that handles iterating
  * over the batch and error handling.
+ *
+ * Constructed in "bypass" mode where items are processed directly
+ * rather than put on the queue. This is important for the
+ * PRE_INIT stage when Cluster is pumping CPG dispatch directly
+ * before the poller has started.
+ *
+ * Calling start() starts the pollable queue and disabled bypass mode.
  */
 template <class T> class PollableQueue : public sys::PollableQueue<T> {
   public:
@@ -41,7 +48,7 @@ template <class T> class PollableQueue :
                   const boost::shared_ptr<sys::Poller>& poller)
         : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
                                 poller),
-          callback(f), error(err), message(msg)
+          callback(f), error(err), message(msg), bypass(true)
     {}
 
     typename sys::PollableQueue<T>::Batch::const_iterator
@@ -62,10 +69,21 @@ template <class T> class PollableQueue :
         }
     }
 
+    void push(const T& t) {
+        if (bypass) callback(t);
+        else sys::PollableQueue<T>::push(t);
+    }
+
+    void start() {
+        bypass = false;
+        sys::PollableQueue<T>::start();
+    }
+
   private:
     Callback callback;
     ErrorCallback error;
     std::string message;
+    bool bypass;
 };
 
     

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Fri Mar 12 20:11:31 2010
@@ -21,6 +21,7 @@
 #include "StoreStatus.h"
 #include "qpid/Exception.h"
 #include "qpid/Msg.h"
+#include "qpid/log/Statement.h"
 #include <boost/filesystem/path.hpp>
 #include <boost/filesystem/fstream.hpp>
 #include <boost/filesystem/operations.hpp>
@@ -54,24 +55,39 @@ Uuid loadUuid(const fs::path& path) {
     Uuid ret;
     if (exists(path)) {
         fs::ifstream i(path);
-        throw_exceptions(i);
-        i >> ret;
+        try {
+            throw_exceptions(i);
+            i >> ret;
+        } catch (const std::exception& e) {
+            QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what());
+            throw;
+        }
     }
     return ret;
 }
 
 void saveUuid(const fs::path& path, const Uuid& uuid) {
     fs::ofstream o(path);
-    throw_exceptions(o);
-    o << uuid;
+    try {
+        throw_exceptions(o);
+        o << uuid;
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what());
+        throw;
+    }
 }
 
 framing::SequenceNumber loadSeqNum(const fs::path& path) {
     uint32_t n = 0;
     if (exists(path)) {
         fs::ifstream i(path);
-        throw_exceptions(i);
-        i >> n;
+        try {
+            throw_exceptions(i);
+            i >> n;
+        } catch (const std::exception& e) {
+            QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what());
+            throw;
+        }
     }
     return framing::SequenceNumber(n);
 }
@@ -105,9 +121,14 @@ void StoreStatus::save() {
         create_directory(dir);
         saveUuid(dir/CLUSTER_ID_FILE, clusterId);
         saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
-        fs::ofstream o(dir/CONFIG_SEQ_FILE);
-        throw_exceptions(o);
-        o << configSeq.getValue();
+        try {
+            fs::ofstream o(dir/CONFIG_SEQ_FILE);
+            throw_exceptions(o);
+            o << configSeq.getValue();
+        } catch (const std::exception& e) {
+            QPID_LOG(error, "Cant save sequence number to " << (dir/CONFIG_SEQ_FILE).string() << ": " << e.what());
+            throw;
+        }
     }
     catch (const std::exception&e) {
         throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what()));

Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Fri Mar 12 20:11:31 2010
@@ -173,20 +173,6 @@ QPID_AUTO_TEST_CASE(testInteveningConfig
     BOOST_CHECK_EQUAL(map.getClusterId(), id);
 }
 
-QPID_AUTO_TEST_CASE(testInitialSize) {
-    InitialStatusMap map(MemberId(0), 3);
-    map.configChange(list_of<MemberId>(0)(1));
-    map.received(MemberId(0), newcomerStatus());
-    map.received(MemberId(1), newcomerStatus());
-    BOOST_CHECK(!map.isComplete());
-
-    map.configChange(list_of<MemberId>(0)(1)(2));
-    map.received(MemberId(0), newcomerStatus());
-    map.received(MemberId(1), newcomerStatus());
-    map.received(MemberId(2), newcomerStatus());
-    BOOST_CHECK(map.isComplete());
-}
-
 QPID_AUTO_TEST_CASE(testAllCleanNoUpdate) {
     InitialStatusMap map(MemberId(0), 3);
     map.configChange(list_of<MemberId>(0)(1)(2));
@@ -244,8 +230,6 @@ QPID_AUTO_TEST_CASE(testEmptyAlone) {
     BOOST_CHECK(!map.isUpdateNeeded());
 }
 
-// FIXME aconway 2009-11-20: consistency tests for mixed stores,
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Mar 12 20:11:31 2010
@@ -29,9 +29,19 @@ from itertools import chain
 
 log = getLogger("qpid.cluster_tests")
 
+# Note: brokers that shut themselves down due to critical error during
+# normal operation will still have an exit code of 0. Brokers that
+# shut down because of an error found during initialize will exit with
+# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
+# and EXPECT_EXIT_FAIL in some of the tests below.
+
+# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error
+# should give non-0 exit status.
+
 # Import scripts as modules
 qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
 
+
 def readfile(filename):
     """Returns te content of file named filename as a string"""
     f = file(filename)
@@ -287,6 +297,11 @@ class StoreTests(BrokerTest):
         m = cluster.start("restartme").get_message("q")
         self.assertEqual("x", m.content)
 
+    def stop_cluster(self,broker):
+        """Clean shut-down of a cluster"""
+        self.assertEqual(0, qpid_cluster.main(
+            ["qpid-cluster", "-kf", broker.host_port()]))
+
     def test_persistent_restart(self):
         """Verify persistent cluster shutdown/restart scenarios"""
         cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
@@ -302,7 +317,7 @@ class StoreTests(BrokerTest):
         self.assertEqual(c.get_message("q").content, "2")
         # Shut down the entire cluster cleanly and bring it back up
         a.send_message("q", Message("3", durable=True))
-        self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()]))
+        self.stop_cluster(a)
         a = cluster.start("a", wait=False)
         b = cluster.start("b", wait=False)
         c = cluster.start("c", wait=True)
@@ -320,7 +335,7 @@ class StoreTests(BrokerTest):
         b.kill()
         self.assertEqual(c.get_message("q").content, "4")
         c.send_message("q", Message("clean", durable=True))
-        self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()]))
+        self.stop_cluster(c)
         a = cluster.start("a", wait=False)
         b = cluster.start("b", wait=False)
         c = cluster.start("c", wait=True)
@@ -333,7 +348,7 @@ class StoreTests(BrokerTest):
         a.terminate()
         cluster2 = self.cluster(1, args=self.args())
         try:
-            a = cluster2.start("a", expect=EXPECT_EXIT_OK)
+            a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
             a.ready()
             self.fail("Expected exception")
         except: pass
@@ -343,27 +358,29 @@ class StoreTests(BrokerTest):
         cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
         a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
         b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
-        self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+        self.stop_cluster(a)
         self.assertEqual(a.wait(), 0)
         self.assertEqual(b.wait(), 0)
 
         # Restart with a different member and shut down.
         a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
         c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
-        self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+        self.stop_cluster(a)
         self.assertEqual(a.wait(), 0)
         self.assertEqual(c.wait(), 0)
-
         # Mix members from both shutdown events, they should fail
-        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
-        b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+        # FIXME aconway 2010-03-11: can't predict the exit status of these
+        # as it depends on the order of delivery of initial-status messages.
+        # See comment at top of this file.
+        a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)
+        b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False)
         self.assertRaises(Exception, lambda: a.ready())
         self.assertRaises(Exception, lambda: b.ready())
 
     def assert_dirty_store(self, broker):
-        self.assertRaises(Exception, lambda: broker.ready())
+        assert retry(lambda: os.path.exists(broker.log)), "Missing log file %s"%broker.log
         msg = re.compile("critical.*no clean store")
-        assert msg.search(readfile(broker.log))
+        assert retry(lambda: msg.search(readfile(broker.log))), "Expected dirty store message in %s"%broker.log
 
     def test_solo_store_clean(self):
         # A single node cluster should always leave a clean store.
@@ -375,7 +392,6 @@ class StoreTests(BrokerTest):
         self.assertEqual(a.get_message("q").content, "x")
 
     def test_last_store_clean(self):
-
         # Verify that only the last node in a cluster to shut down has
         # a clean store. Start with cluster of 3, reduce to 1 then
         # increase again to ensure that a node that was once alone but
@@ -394,13 +410,41 @@ class StoreTests(BrokerTest):
         time.sleep(0.1)   # pause for a to find out hes last.
         a.kill()          # really last
         # b & c should be dirty
-        b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+        b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
         self.assert_dirty_store(b)
-        c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+        c = cluster.start("c", wait=False, expect=EXPECT_EXIT_FAIL)
         self.assert_dirty_store(c)
         # a should be clean
         a = cluster.start("a")
         self.assertEqual(a.get_message("q").content, "x")
 
-
+    def test_restart_clean(self):
+        """Verify that we can re-start brokers one by one in a
+        persistent cluster after a clean oshutdown"""
+        cluster = self.cluster(0, self.args())
+        a = cluster.start("a", expect=EXPECT_EXIT_OK)
+        b = cluster.start("b", expect=EXPECT_EXIT_OK)
+        c = cluster.start("c", expect=EXPECT_EXIT_OK)
+        a.send_message("q", Message("x", durable=True))
+        self.stop_cluster(a)
+        a = cluster.start("a")
+        b = cluster.start("b")
+        c = cluster.start("c")
+        self.assertEqual(c.get_message("q").content, "x")
+
+    def test_join_sub_size(self):
+        """Verify that after starting a cluster with cluster-size=N,
+        we can join new members even if size < N-1"""
+        cluster = self.cluster(0, self.args())
+        a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL)
+        b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
+        c = cluster.start("c")
+        a.send_message("q", Message("x", durable=True))
+        a.send_message("q", Message("y", durable=True))
+        a.kill()
+        b.kill()
+        a = cluster.start("a")
+        self.assertEqual(c.get_message("q").content, "x")
+        b = cluster.start("b")
+        self.assertEqual(c.get_message("q").content, "y")
 

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Fri Mar 12 20:11:31 2010
@@ -268,6 +268,7 @@ class Broker(Popen):
         test.cleanup_stop(self)
         self._host = "localhost"
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+        self._log_ready = False
 
     def host(self): return self._host
 
@@ -343,12 +344,14 @@ class Broker(Popen):
 
     def log_ready(self):
         """Return true if the log file exists and contains a broker ready message"""
+        if self._log_ready: return True
         if not os.path.exists(self.log): return False
-        ready_msg = re.compile("notice Broker running")
         f = file(self.log)
         try:
             for l in f:
-                if ready_msg.search(l): return True
+                if "notice Broker running" in l:
+                    self._log_ready = True
+                    return True
             return False
         finally: f.close()
 
@@ -445,7 +448,7 @@ class BrokerTest(TestCase):
         if (wait):
             try: b.ready()
             except Exception, e:
-                raise Exception("Failed to start broker %s: %s" % ( b.name, e))
+                raise Exception("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
         return b
 
     def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org