You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/12 21:11:32 UTC
svn commit: r922412 - in /qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/qpid/cluster/ cpp/src/tests/ python/qpid/
Author: aconway
Date: Fri Mar 12 20:11:31 2010
New Revision: 922412
URL: http://svn.apache.org/viewvc?rev=922412&view=rev
Log:
New cluster member pushes store when joining an active cluster.
Previously a broker with a clean store would not be able to join an
active cluster because the shtudown-id did not match. This commit
ensures that when a broker joins an active cluster, it always pushes
its store regardless of status. Clean/dirty status is only compared
when forming an initial cluster.
This change required splitting initialization into two phases:
PRE_INIT: occurs in the Cluster ctor during early-initialize. This
phase determines whether or not to push the store.
INIT: occurs after Cluster::initialize and does the remaining
initialization chores.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Fri Mar 12 20:11:31 2010
@@ -79,7 +79,7 @@ void IncompleteMessageList::each(const C
sys::Mutex::ScopedLock l(lock);
snapshot = incomplete;
}
- std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value?
+ std::for_each(incomplete.begin(), incomplete.end(), listen);
}
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Mar 12 20:11:31 2010
@@ -16,7 +16,8 @@
*
*/
-/** CLUSTER IMPLEMENTATION OVERVIEW
+/**
+ * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1>
*
* The cluster works on the principle that if all members of the
* cluster receive identical input, they will all produce identical
@@ -41,12 +42,15 @@
*
* The following are the current areas where broker uses timers or timestamps:
*
- * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput.
- * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState
+ * - Producer flow control: broker::SemanticState uses
+ * connection::getClusterOrderOutput. a FrameHandler that sends
+ * frames to the client via the cluster. Used by broker::SessionState
*
- * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy.
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
+ * implemented by cluster::ExpiryPolicy.
*
- * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore.
+ * - Connection heartbeat: sends connection controls, not part of
+ * session command counting so OK to ignore.
*
* - LinkRegistry: only cluster elder is ever active for links.
*
@@ -57,7 +61,10 @@
*
* cluster::ExpiryPolicy implements the strategy for message expiry.
*
- * CLUSTER PROTOCOL OVERVIEW
+ * ClusterTimer implements periodic timed events in the cluster context.
+ * Used for periodic management events.
+ *
+ * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
*
* Messages sent to/from CPG are called Events.
*
@@ -84,12 +91,16 @@
* - Connection control events carrying non-cluster frames: frames sent to the client.
* e.g. flow-control frames generated on a timer.
*
- * CLUSTER INITIALIZATION OVERVIEW
+ * <h1>CLUSTER INITIALIZATION OVERVIEW</h1>
+ *
+ * @see InitialStatusMap
*
* When a new member joins the CPG group, all members (including the
* new one) multicast their "initial status." The new member is in
- * INIT mode until it gets a complete set of initial status messages
- * from all cluster members.
+ * PRE_INIT mode until it gets a complete set of initial status
+ * messages from all cluster members. In a newly-forming cluster is
+ * then in INIT mode until the configured cluster-size members have
+ * joined.
*
* The newcomer uses initial status to determine
* - The cluster UUID
@@ -97,11 +108,16 @@
* - Do I need to get an update from an existing active member?
* - Can I recover from my own store?
*
- * Initialization happens in the Cluster constructor (plugin
- * early-init phase) because it needs to be done before the store
- * initializes. In INIT mode sending & receiving from the cluster are
- * done single-threaded, bypassing the normal PollableQueues because
- * the Poller is not active at this point to service them.
+ * Pre-initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to set the recovery flag before
+ * the store initializes. This phase lasts until inital-status is
+ * received for all active members. The PollableQueues and Multicaster
+ * are in "bypass" mode during this phase since the poller has not
+ * started so there are no threads to serve pollable queues.
+ *
+ * The remaining initialization happens in Cluster::initialize() or,
+ * if cluster-size=N is specified, in the deliver thread when an
+ * initial-status control is delivered that brings the total to N.
*/
#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
@@ -244,7 +260,7 @@ Cluster::Cluster(const ClusterSettings&
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
- state(INIT),
+ state(PRE_INIT),
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
elder(false),
@@ -274,17 +290,18 @@ Cluster::Cluster(const ClusterSettings&
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- if (store.getState() == STORE_STATE_DIRTY_STORE)
- broker.setRecovery(false); // Ditch my current store.
if (store.getClusterId())
clusterId = store.getClusterId(); // Use stored ID if there is one.
QPID_LOG(notice, "Cluster store state: " << store)
}
-
cpg.join(name);
+ // pump the CPG dispatch manually till we get past PRE_INIT.
+ while (state == PRE_INIT)
+ cpg.dispatchOne();
}
Cluster::~Cluster() {
@@ -301,9 +318,14 @@ void Cluster::initialize() {
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
+ mcast.start();
+
+ // Run initMapCompleted immediately to process the initial configuration.
+ assert(state == INIT);
+ initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
// Add finalizer last for exception safety.
- broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
// Called in connection thread to insert a client connection.
@@ -579,9 +601,27 @@ void Cluster::setReady(Lock&) {
void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
- if (state == INIT) {
- // We have status for all members so we can make join descisions.
+ if (state == PRE_INIT) {
+ // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
+ // We decide here whether we want to recover from our store.
+ // We won't recover if we are joining an active cluster or our store is dirty.
+ if (store.hasStore() &&
+ (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
+ broker.setRecovery(false); // Ditch my current store.
+ state = INIT;
+ }
+ else if (state == INIT) {
+ // INIT means we are past Cluster::initialize().
+
+ // If we're forming an initial cluster (no active members)
+ // then we wait to reach the configured cluster-size
+ if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) {
+ QPID_LOG(info, *this << initMap.getActualSize()
+ << " members, waiting for at least " << initMap.getRequiredSize());
+ return;
+ }
initMap.checkConsistent();
+
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (elders.empty())
@@ -969,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) {
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = {
- "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
+ "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
@@ -1009,12 +1050,14 @@ void Cluster::errorCheck(const MemberId&
}
void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
- timer->deliverWakeup(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverWakeup(name);
}
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
- timer->deliverDrop(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Mar 12 20:11:31 2010
@@ -180,6 +180,7 @@ class Cluster : private Cpg::Handler, pu
void memberUpdate(Lock&);
void setClusterId(const framing::Uuid&, Lock&);
void erase(const ConnectionId&, Lock&);
+ void requestUpdate(Lock& );
void initMapCompleted(Lock&);
void becomeElder(Lock&);
@@ -251,7 +252,8 @@ class Cluster : private Cpg::Handler, pu
// Local cluster state, cluster map
enum {
- INIT, ///< Establishing inital cluster stattus.
+ PRE_INIT,///< Have not yet received complete initial status map.
+ INIT, ///< Waiting to reach cluster-size.
JOINER, ///< Sent update request, waiting for update offer.
UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Fri Mar 12 20:11:31 2010
@@ -86,8 +86,7 @@ bool InitialStatusMap::notInitialized(co
}
bool InitialStatusMap::isComplete() const {
- return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end()
- && (map.size() >= size);
+ return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end();
}
bool InitialStatusMap::transitionToComplete() {
@@ -100,7 +99,7 @@ bool InitialStatusMap::isResendNeeded()
return ret;
}
-bool InitialStatusMap::isActive(const Map::value_type& v) {
+bool InitialStatusMap::isActiveEntry(const Map::value_type& v) {
return v.second && v.second->getActive();
}
@@ -110,10 +109,15 @@ bool InitialStatusMap::hasStore(const Ma
v.second->getStoreState() == STORE_STATE_DIRTY_STORE);
}
+bool InitialStatusMap::isActive() {
+ assert(isComplete());
+ return (find_if(map.begin(), map.end(), &isActiveEntry) != map.end());
+}
+
bool InitialStatusMap::isUpdateNeeded() {
assert(isComplete());
// We need an update if there are any active members.
- if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
+ if (isActive()) return true;
// Otherwise it depends on store status, get my own status:
Map::iterator me = map.find(self);
@@ -154,7 +158,7 @@ MemberSet InitialStatusMap::getElders()
Uuid InitialStatusMap::getClusterId() {
assert(isComplete());
assert(!map.empty());
- Map::iterator i = find_if(map.begin(), map.end(), &isActive);
+ Map::iterator i = find_if(map.begin(), map.end(), &isActiveEntry);
if (i != map.end())
return i->second->getClusterId(); // An active member
else
@@ -178,6 +182,7 @@ void InitialStatusMap::checkConsistent()
Uuid clusterId;
Uuid shutdownId;
+ bool initialCluster = !isActive();
for (Map::iterator i = map.begin(); i != map.end(); ++i) {
assert(i->second);
if (i->second->getActive()) ++active;
@@ -193,8 +198,10 @@ void InitialStatusMap::checkConsistent()
++clean;
checkId(clusterId, i->second->getClusterId(),
"Cluster-ID mismatch. Stores belong to different clusters.");
- checkId(shutdownId, i->second->getShutdownId(),
- "Shutdown-ID mismatch. Stores were not shut down together");
+ // Only need shutdownId to match if we are in an initially forming cluster.
+ if (initialCluster)
+ checkId(shutdownId, i->second->getShutdownId(),
+ "Shutdown-ID mismatch. Stores were not shut down together");
break;
}
}
@@ -202,10 +209,13 @@ void InitialStatusMap::checkConsistent()
if (none && (clean+dirty+empty))
throw Exception("Mixing transient and persistent brokers in a cluster");
- // If there are no active members and there are dirty stores there
- // must be at least one clean store.
- if (!active && dirty && !clean)
- throw Exception("Cannot recover, no clean store.");
+ if (map.size() >= size) {
+ // All initial members are present. If there are no active
+ // members and there are dirty stores there must be at least
+ // one clean store.
+ if (!active && dirty && !clean)
+ throw Exception("Cannot recover, no clean store.");
+ }
}
std::string InitialStatusMap::getFirstConfigStr() const {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Fri Mar 12 20:11:31 2010
@@ -51,12 +51,18 @@ class InitialStatusMap
/** Process received status */
void received(const MemberId&, const Status& is);
- /**@return true if the map is complete. */
+ /**@return true if the map has an entry for all current cluster members. */
bool isComplete() const;
+
+ size_t getActualSize() const { return map.size(); }
+ size_t getRequiredSize() const { return size; }
+
/**@return true if the map was completed by the last config change or received. */
bool transitionToComplete();
/**@pre isComplete(). @return this node's elders */
MemberSet getElders() const;
+ /**@pre isComplete(). @return True if there are active members of the cluster. */
+ bool isActive();
/**@pre isComplete(). @return True if we need to request an update. */
bool isUpdateNeeded();
/**@pre isComplete(). @return Cluster-wide cluster ID. */
@@ -71,8 +77,9 @@ class InitialStatusMap
private:
typedef std::map<MemberId, boost::optional<Status> > Map;
static bool notInitialized(const Map::value_type&);
- static bool isActive(const Map::value_type&);
+ static bool isActiveEntry(const Map::value_type&);
static bool hasStore(const Map::value_type&);
+
Map map;
MemberSet firstConfig;
MemberId self;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Fri Mar 12 20:11:31 2010
@@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_,
boost::function<void()> onError_) :
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- ready(false)
-{
- queue.start();
-}
+ ready(false), bypass(true)
+{}
void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
mcast(Event::control(body, id));
@@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e)
}
}
QPID_LOG(trace, "MCAST " << e);
- queue.push(e);
+ if (bypass) { // direct, don't queue
+ iovec iov = e.toIovec();
+ // FIXME aconway 2010-03-10: should do limited retry.
+ while (!cpg.mcast(&iov, 1))
+ ;
+ }
+ else
+ queue.push(e);
}
-
Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
try {
PollableEventQueue::Batch::const_iterator i = values.begin();
@@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::
}
}
+void Multicaster::start() {
+ queue.start();
+ bypass = false;
+}
+
void Multicaster::setReady() {
sys::Mutex::ScopedLock l(lock);
ready = true;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Fri Mar 12 20:11:31 2010
@@ -41,16 +41,18 @@ class Cpg;
/**
* Multicast to the cluster. Shared, thread safe object.
- *
- * Runs in two modes;
*
- * initializing: Hold connection mcast events. Multicast cluster
- * events directly in the calling thread. This mode is used before
- * joining the cluster where the poller may not yet be active and we
- * want to hold any connection traffic till we join.
+ * holding mode: Hold connection events for later multicast. Cluster
+ * events are never held. Used during PRE_INIT/INIT state when we
+ * want to hold any connection traffic till we are read in the
+ * cluster.
+ *
+ * bypass mode: Multicast cluster events directly in the calling
+ * thread. This mode is used by cluster in PRE_INIT state the poller
+ * is not yet be active.
*
- * ready: normal operation. Queues all mcasts on a pollable queue,
- * multicasts connection and cluster events.
+ * Multicaster is created in bypass+holding mode, they are disabled by
+ * start and setReady respectively.
*/
class Multicaster
{
@@ -65,7 +67,9 @@ class Multicaster
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
- /** Switch to ready mode. */
+ /** Start the pollable queue, turn off bypass mode. */
+ void start();
+ /** Switch to ready mode, release held messages. */
void setReady();
private:
@@ -81,6 +85,7 @@ class Multicaster
bool ready;
PlainEventQueue holdingQueue;
std::vector<struct ::iovec> ioVector;
+ bool bypass;
};
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Fri Mar 12 20:11:31 2010
@@ -31,6 +31,13 @@ namespace cluster {
/**
* More convenient version of PollableQueue that handles iterating
* over the batch and error handling.
+ *
+ * Constructed in "bypass" mode where items are processed directly
+ * rather than put on the queue. This is important for the
+ * PRE_INIT stage when Cluster is pumping CPG dispatch directly
+ * before the poller has started.
+ *
+ * Calling start() starts the pollable queue and disabled bypass mode.
*/
template <class T> class PollableQueue : public sys::PollableQueue<T> {
public:
@@ -41,7 +48,7 @@ template <class T> class PollableQueue :
const boost::shared_ptr<sys::Poller>& poller)
: sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
poller),
- callback(f), error(err), message(msg)
+ callback(f), error(err), message(msg), bypass(true)
{}
typename sys::PollableQueue<T>::Batch::const_iterator
@@ -62,10 +69,21 @@ template <class T> class PollableQueue :
}
}
+ void push(const T& t) {
+ if (bypass) callback(t);
+ else sys::PollableQueue<T>::push(t);
+ }
+
+ void start() {
+ bypass = false;
+ sys::PollableQueue<T>::start();
+ }
+
private:
Callback callback;
ErrorCallback error;
std::string message;
+ bool bypass;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Fri Mar 12 20:11:31 2010
@@ -21,6 +21,7 @@
#include "StoreStatus.h"
#include "qpid/Exception.h"
#include "qpid/Msg.h"
+#include "qpid/log/Statement.h"
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/operations.hpp>
@@ -54,24 +55,39 @@ Uuid loadUuid(const fs::path& path) {
Uuid ret;
if (exists(path)) {
fs::ifstream i(path);
- throw_exceptions(i);
- i >> ret;
+ try {
+ throw_exceptions(i);
+ i >> ret;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what());
+ throw;
+ }
}
return ret;
}
void saveUuid(const fs::path& path, const Uuid& uuid) {
fs::ofstream o(path);
- throw_exceptions(o);
- o << uuid;
+ try {
+ throw_exceptions(o);
+ o << uuid;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what());
+ throw;
+ }
}
framing::SequenceNumber loadSeqNum(const fs::path& path) {
uint32_t n = 0;
if (exists(path)) {
fs::ifstream i(path);
- throw_exceptions(i);
- i >> n;
+ try {
+ throw_exceptions(i);
+ i >> n;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what());
+ throw;
+ }
}
return framing::SequenceNumber(n);
}
@@ -105,9 +121,14 @@ void StoreStatus::save() {
create_directory(dir);
saveUuid(dir/CLUSTER_ID_FILE, clusterId);
saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
- fs::ofstream o(dir/CONFIG_SEQ_FILE);
- throw_exceptions(o);
- o << configSeq.getValue();
+ try {
+ fs::ofstream o(dir/CONFIG_SEQ_FILE);
+ throw_exceptions(o);
+ o << configSeq.getValue();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Cant save sequence number to " << (dir/CONFIG_SEQ_FILE).string() << ": " << e.what());
+ throw;
+ }
}
catch (const std::exception&e) {
throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what()));
Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Fri Mar 12 20:11:31 2010
@@ -173,20 +173,6 @@ QPID_AUTO_TEST_CASE(testInteveningConfig
BOOST_CHECK_EQUAL(map.getClusterId(), id);
}
-QPID_AUTO_TEST_CASE(testInitialSize) {
- InitialStatusMap map(MemberId(0), 3);
- map.configChange(list_of<MemberId>(0)(1));
- map.received(MemberId(0), newcomerStatus());
- map.received(MemberId(1), newcomerStatus());
- BOOST_CHECK(!map.isComplete());
-
- map.configChange(list_of<MemberId>(0)(1)(2));
- map.received(MemberId(0), newcomerStatus());
- map.received(MemberId(1), newcomerStatus());
- map.received(MemberId(2), newcomerStatus());
- BOOST_CHECK(map.isComplete());
-}
-
QPID_AUTO_TEST_CASE(testAllCleanNoUpdate) {
InitialStatusMap map(MemberId(0), 3);
map.configChange(list_of<MemberId>(0)(1)(2));
@@ -244,8 +230,6 @@ QPID_AUTO_TEST_CASE(testEmptyAlone) {
BOOST_CHECK(!map.isUpdateNeeded());
}
-// FIXME aconway 2009-11-20: consistency tests for mixed stores,
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Mar 12 20:11:31 2010
@@ -29,9 +29,19 @@ from itertools import chain
log = getLogger("qpid.cluster_tests")
+# Note: brokers that shut themselves down due to critical error during
+# normal operation will still have an exit code of 0. Brokers that
+# shut down because of an error found during initialize will exit with
+# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
+# and EXPECT_EXIT_FAIL in some of the tests below.
+
+# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error
+# should give non-0 exit status.
+
# Import scripts as modules
qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
+
def readfile(filename):
"""Returns te content of file named filename as a string"""
f = file(filename)
@@ -287,6 +297,11 @@ class StoreTests(BrokerTest):
m = cluster.start("restartme").get_message("q")
self.assertEqual("x", m.content)
+ def stop_cluster(self,broker):
+ """Clean shut-down of a cluster"""
+ self.assertEqual(0, qpid_cluster.main(
+ ["qpid-cluster", "-kf", broker.host_port()]))
+
def test_persistent_restart(self):
"""Verify persistent cluster shutdown/restart scenarios"""
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
@@ -302,7 +317,7 @@ class StoreTests(BrokerTest):
self.assertEqual(c.get_message("q").content, "2")
# Shut down the entire cluster cleanly and bring it back up
a.send_message("q", Message("3", durable=True))
- self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()]))
+ self.stop_cluster(a)
a = cluster.start("a", wait=False)
b = cluster.start("b", wait=False)
c = cluster.start("c", wait=True)
@@ -320,7 +335,7 @@ class StoreTests(BrokerTest):
b.kill()
self.assertEqual(c.get_message("q").content, "4")
c.send_message("q", Message("clean", durable=True))
- self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()]))
+ self.stop_cluster(c)
a = cluster.start("a", wait=False)
b = cluster.start("b", wait=False)
c = cluster.start("c", wait=True)
@@ -333,7 +348,7 @@ class StoreTests(BrokerTest):
a.terminate()
cluster2 = self.cluster(1, args=self.args())
try:
- a = cluster2.start("a", expect=EXPECT_EXIT_OK)
+ a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
a.ready()
self.fail("Expected exception")
except: pass
@@ -343,27 +358,29 @@ class StoreTests(BrokerTest):
cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
- self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.stop_cluster(a)
self.assertEqual(a.wait(), 0)
self.assertEqual(b.wait(), 0)
# Restart with a different member and shut down.
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
- self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.stop_cluster(a)
self.assertEqual(a.wait(), 0)
self.assertEqual(c.wait(), 0)
-
# Mix members from both shutdown events, they should fail
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ # FIXME aconway 2010-03-11: can't predict the exit status of these
+ # as it depends on the order of delivery of initial-status messages.
+ # See comment at top of this file.
+ a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)
+ b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False)
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
def assert_dirty_store(self, broker):
- self.assertRaises(Exception, lambda: broker.ready())
+ assert retry(lambda: os.path.exists(broker.log)), "Missing log file %s"%broker.log
msg = re.compile("critical.*no clean store")
- assert msg.search(readfile(broker.log))
+ assert retry(lambda: msg.search(readfile(broker.log))), "Expected dirty store message in %s"%broker.log
def test_solo_store_clean(self):
# A single node cluster should always leave a clean store.
@@ -375,7 +392,6 @@ class StoreTests(BrokerTest):
self.assertEqual(a.get_message("q").content, "x")
def test_last_store_clean(self):
-
# Verify that only the last node in a cluster to shut down has
# a clean store. Start with cluster of 3, reduce to 1 then
# increase again to ensure that a node that was once alone but
@@ -394,13 +410,41 @@ class StoreTests(BrokerTest):
time.sleep(0.1) # pause for a to find out hes last.
a.kill() # really last
# b & c should be dirty
- b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+ b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
self.assert_dirty_store(b)
- c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+ c = cluster.start("c", wait=False, expect=EXPECT_EXIT_FAIL)
self.assert_dirty_store(c)
# a should be clean
a = cluster.start("a")
self.assertEqual(a.get_message("q").content, "x")
-
+ def test_restart_clean(self):
+ """Verify that we can re-start brokers one by one in a
+ persistent cluster after a clean oshutdown"""
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_OK)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK)
+ a.send_message("q", Message("x", durable=True))
+ self.stop_cluster(a)
+ a = cluster.start("a")
+ b = cluster.start("b")
+ c = cluster.start("c")
+ self.assertEqual(c.get_message("q").content, "x")
+
+ def test_join_sub_size(self):
+ """Verify that after starting a cluster with cluster-size=N,
+ we can join new members even if size < N-1"""
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL)
+ b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
+ c = cluster.start("c")
+ a.send_message("q", Message("x", durable=True))
+ a.send_message("q", Message("y", durable=True))
+ a.kill()
+ b.kill()
+ a = cluster.start("a")
+ self.assertEqual(c.get_message("q").content, "x")
+ b = cluster.start("b")
+ self.assertEqual(c.get_message("q").content, "y")
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=922412&r1=922411&r2=922412&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Fri Mar 12 20:11:31 2010
@@ -268,6 +268,7 @@ class Broker(Popen):
test.cleanup_stop(self)
self._host = "localhost"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+ self._log_ready = False
def host(self): return self._host
@@ -343,12 +344,14 @@ class Broker(Popen):
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
+ if self._log_ready: return True
if not os.path.exists(self.log): return False
- ready_msg = re.compile("notice Broker running")
f = file(self.log)
try:
for l in f:
- if ready_msg.search(l): return True
+ if "notice Broker running" in l:
+ self._log_ready = True
+ return True
return False
finally: f.close()
@@ -445,7 +448,7 @@ class BrokerTest(TestCase):
if (wait):
try: b.ready()
except Exception, e:
- raise Exception("Failed to start broker %s: %s" % ( b.name, e))
+ raise Exception("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org