You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/11/17 19:09:22 UTC
svn commit: r881423 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/
src/qpid/framing/ src/tests/ xml/
Author: aconway
Date: Tue Nov 17 18:09:21 2009
New Revision: 881423
URL: http://svn.apache.org/viewvc?rev=881423&view=rev
Log:
Integrated InitialStatusMap into cluster code.
Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h
- copied, changed from r881420, qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
Modified:
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Nov 17 18:09:21 2009
@@ -84,6 +84,8 @@
qpid/cluster/Quorum.h \
qpid/cluster/InitialStatusMap.h \
qpid/cluster/InitialStatusMap.cpp \
+ qpid/cluster/MemberSet.h \
+ qpid/cluster/MemberSet.cpp \
qpid/cluster/types.h
cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Nov 17 18:09:21 2009
@@ -162,13 +162,14 @@
ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
- void initialStatus(bool active, bool persistent, const framing::FieldTable& props) {
- cluster.initialStatus(member, active, persistent, props);
+ void initialStatus(bool active, bool persistent, const Uuid& clusterId,
+ uint32_t version, const std::string& url) {
+ cluster.initialStatus(member, active, persistent, clusterId, version, url, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
- void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) {
- cluster.updateOffer(member, updatee, id, version, l);
+ void updateOffer(uint64_t updatee) {
+ cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
@@ -190,6 +191,7 @@
name(settings.name),
myUrl(settings.url.empty() ? Url() : Url(settings.url)),
self(cpg.self()),
+ clusterId(true),
expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
@@ -206,6 +208,7 @@
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
state(INIT),
+ initMap(self),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -265,8 +268,8 @@
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
QPID_LOG(info, *this << " new shadow connection " << c->getId());
- // Safe to use connections here because we're pre-catchup, either
- // discarding or stalled, so deliveredFrame is not processing any
+ // Safe to use connections here because we're pre-catchup, stalled
+ // and discarding, so deliveredFrame is not processing any
// connection events.
assert(discarding);
pair<ConnectionMap::iterator, bool> ib
@@ -522,7 +525,8 @@
const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- if (state == INIT) { // First config change.
+ if (state == INIT) {
+ // FIXME aconway 2009-11-16: persistent restart
// Recover only if we are first in cluster.
broker.setRecovery(nCurrent == 1);
initialized = true;
@@ -545,39 +549,55 @@
broker.getQueueEvents().enable();
}
-void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
- bool memberChange = map.configChange(current);
- QPID_LOG(debug, *this << " applied config change: " << map);
+void Cluster::initMapCompleted(Lock& l) {
+ if (state == INIT) {
+ elders = initMap.getElders();
+ if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
+ broker.getLinks().setPassive(true);
+ broker.getQueueEvents().disable();
+ }
+ setClusterId(initMap.getClusterId(), l);
+ if (initMap.isUpdateNeeded()) { // Joining established cluster.
+ state = JOINER;
+ mcast.mcastControl(
+ ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ }
+ else { // I can go ready.
+ QPID_LOG(notice, *this << " ready.");
+ discarding = false;
+ setReady(l);
+ map = ClusterMap(initMap.getMemberUrls());
+ memberUpdate(l);
+ }
+ }
+}
+
+void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
if (state == LEFT) return;
-
- if (!map.isAlive(self)) { // Final config change.
+
+ MemberSet config = decodeMemberSet(configStr);
+ elders = intersection(elders, config);
+ if (elders.empty() && INIT < state && state < CATCHUP) {
+ QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
leave(l);
return;
}
+ bool memberChange = map.configChange(config);
- if (state == INIT) { // First configChange
- if (map.aliveCount() == 1) {
- setClusterId(true, l);
- discarding = false;
- setReady(l);
- map = ClusterMap(self, myUrl, true);
- memberUpdate(l);
- QPID_LOG(notice, *this << " first in cluster");
- }
- else { // Joining established group.
- state = JOINER;
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
- elders = map.getAlive();
- elders.erase(self);
- broker.getLinks().setPassive(true);
- broker.getQueueEvents().disable();
- }
- }
- else if (state >= CATCHUP && memberChange) {
+ // Update initital status for new members joining.
+ initMap.configChange(config);
+ if (initMap.isResendNeeded()) {
+ mcast.mcastControl(
+ // FIXME aconway 2009-11-17: persistent restart, set persistence bit.
+ ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId,
+ CLUSTER_VERSION, myUrl.str()), self);
+ }
+ if (initMap.transitionToComplete()) initMapCompleted(l);
+
+ if (state >= CATCHUP && memberChange) {
memberUpdate(l);
- elders = ClusterMap::intersection(elders, map.getAlive());
if (elders.empty()) {
- //assume we are oldest, reactive links if necessary
+ // We are the oldest, reactive links if necessary
broker.getLinks().setPassive(false);
}
}
@@ -587,8 +607,7 @@
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(
- ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self);
}
}
@@ -610,10 +629,23 @@
makeOffer(id, l);
}
-void Cluster::initialStatus(const MemberId&, bool /*active*/, bool /*persistent*/,
- const framing::FieldTable&) {
- // FIXME aconway 2009-11-12: fill in.
+void Cluster::initialStatus(const MemberId& member, bool active, bool persistent,
+ const framing::Uuid& id, uint32_t version,
+ const std::string& url, Lock& l)
+{
+ if (version != CLUSTER_VERSION) {
+ QPID_LOG(critical, *this << " incompatible cluster versions " <<
+ version << " != " << CLUSTER_VERSION);
+ leave(l);
+ return;
+ }
+ initMap.received(
+ member,
+ ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url)
+ );
+ if (initMap.transitionToComplete()) initMapCompleted(l);
}
+
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
@@ -623,17 +655,10 @@
}
}
-void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid,
- uint32_t version, Lock& l) {
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
// NOTE: deliverEventQueue has been stopped at the update offer by
// deliveredEvent in case an update is required.
if (state == LEFT) return;
- if (version != CLUSTER_VERSION) {
- QPID_LOG(critical, *this << " incompatible cluster versions " <<
- version << " != " << CLUSTER_VERSION);
- leave(l);
- return;
- }
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
if (updater == self) {
@@ -649,7 +674,6 @@
}
else if (updatee == self && url) {
assert(state == JOINER);
- setClusterId(uuid, l);
state = UPDATEE;
QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Nov 17 18:09:21 2009
@@ -19,6 +19,7 @@
*
*/
+#include "InitialStatusMap.h"
#include "ClusterMap.h"
#include "ClusterSettings.h"
#include "Cpg.h"
@@ -144,10 +145,11 @@
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
- void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&,
- uint32_t version, Lock&);
+ void updateOffer(const MemberId& updater, uint64_t updatee, Lock&);
void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
- void initialStatus(const MemberId&, bool active, bool persistent, const framing::FieldTable& props);
+ void initialStatus(const MemberId&, bool active, bool persistent,
+ const framing::Uuid& id, uint32_t version,
+ const std::string& url, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
@@ -165,6 +167,10 @@
void setClusterId(const framing::Uuid&, Lock&);
void erase(const ConnectionId&, Lock&);
+ void initMapCompleted(Lock&);
+
+
+
// == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
@@ -241,7 +247,7 @@
// Local cluster state, cluster map
enum {
- INIT, ///< Initial state, no CPG messages received.
+ INIT, ///< Establishing inital cluster stattus.
JOINER, ///< Sent update request, waiting for update offer.
UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
@@ -252,8 +258,9 @@
} state;
ConnectionMap connections;
+ InitialStatusMap initMap;
ClusterMap map;
- ClusterMap::Set elders;
+ MemberSet elders;
size_t lastSize;
bool lastBroker;
sys::Thread updateThread;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Tue Nov 17 18:09:21 2009
@@ -28,24 +28,20 @@
#include <iterator>
#include <ostream>
+using namespace std;
+using namespace boost;
+
namespace qpid {
using namespace framing;
namespace cluster {
-ClusterMap::Set ClusterMap::decode(const std::string& s) {
- Set set;
- for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8)
- set.insert(MemberId(std::string(i, i+8)));
- return set;
-}
-
namespace {
void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) {
MemberId id(vt.first);
set.insert(id);
- std::string url = vt.second->get<std::string>();
+ string url = vt.second->get<string>();
if (!url.empty())
map.insert(ClusterMap::Map::value_type(id, Url(url)));
}
@@ -56,37 +52,34 @@
void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
ft.clear();
- std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(ft), _1));
+ for_each(map.begin(), map.end(), bind(&insertFieldTableFromMapValue, ref(ft), _1));
}
}
ClusterMap::ClusterMap() : frameSeq(0) {}
-ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) {
- alive.insert(id);
- if (isMember)
- members[id] = url;
- else
- joiners[id] = url;
+ClusterMap::ClusterMap(const Map& map) : frameSeq(0) {
+ transform(map.begin(), map.end(), inserter(alive, alive.begin()), bind(&Map::value_type::first, _1));
+ members = map;
}
ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_)
: frameSeq(frameSeq_)
{
- std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
- std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
+ for_each(joinersFt.begin(), joinersFt.end(), bind(&addFieldTableValue, _1, ref(joiners), ref(alive)));
+ for_each(membersFt.begin(), membersFt.end(), bind(&addFieldTableValue, _1, ref(members), ref(alive)));
}
void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const {
b.getJoiners().clear();
- std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1));
+ for_each(joiners.begin(), joiners.end(), bind(&insertFieldTableFromMapValue, ref(b.getJoiners()), _1));
for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) {
if (!isMember(*i) && !isJoiner(*i))
- b.getJoiners().setString(i->str(), std::string());
+ b.getJoiners().setString(i->str(), string());
}
b.getMembers().clear();
- std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
+ for_each(members.begin(), members.end(), bind(&insertFieldTableFromMapValue, ref(b.getMembers()), _1));
b.setFrameSeq(frameSeq);
}
@@ -99,21 +92,21 @@
return joiners.empty() ? MemberId() : joiners.begin()->first;
}
-std::vector<string> ClusterMap::memberIds() const {
- std::vector<string> ids;
+vector<string> ClusterMap::memberIds() const {
+ vector<string> ids;
for (Map::const_iterator iter = members.begin();
iter != members.end(); iter++) {
- std::stringstream stream;
+ stringstream stream;
stream << iter->first;
ids.push_back(stream.str());
}
return ids;
}
-std::vector<Url> ClusterMap::memberUrls() const {
- std::vector<Url> urls(members.size());
- std::transform(members.begin(), members.end(), urls.begin(),
- boost::bind(&Map::value_type::second, _1));
+vector<Url> ClusterMap::memberUrls() const {
+ vector<Url> urls(members.size());
+ transform(members.begin(), members.end(), urls.begin(),
+ bind(&Map::value_type::second, _1));
return urls;
}
@@ -121,18 +114,18 @@
ClusterMap::Set ClusterMap::getMembers() const {
Set s;
- std::transform(members.begin(), members.end(), std::inserter(s, s.begin()),
- boost::bind(&Map::value_type::first, _1));
+ transform(members.begin(), members.end(), inserter(s, s.begin()),
+ bind(&Map::value_type::first, _1));
return s;
}
-std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
- std::ostream_iterator<MemberId> oi(o);
- std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1));
+ostream& operator<<(ostream& o, const ClusterMap::Map& m) {
+ ostream_iterator<MemberId> oi(o);
+ transform(m.begin(), m.end(), oi, bind(&ClusterMap::Map::value_type::first, _1));
return o;
}
-std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
+ostream& operator<<(ostream& o, const ClusterMap& m) {
for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) {
o << *i;
if (m.isMember(*i)) o << "(member)";
@@ -143,7 +136,7 @@
return o;
}
-bool ClusterMap::updateRequest(const MemberId& id, const std::string& url) {
+bool ClusterMap::updateRequest(const MemberId& id, const string& url) {
if (isAlive(id)) {
joiners[id] = Url(url);
return true;
@@ -155,13 +148,12 @@
return isAlive(id) && members.insert(Map::value_type(id,url)).second;
}
-bool ClusterMap::configChange(const std::string& addresses) {
+bool ClusterMap::configChange(const Set& update) {
bool memberChange = false;
- Set update = decode(addresses);
Set removed;
- std::set_difference(alive.begin(), alive.end(),
+ set_difference(alive.begin(), alive.end(),
update.begin(), update.end(),
- std::inserter(removed, removed.begin()));
+ inserter(removed, removed.begin()));
alive = update;
for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) {
memberChange = memberChange || members.erase(*i);
@@ -170,23 +162,14 @@
return memberChange;
}
-boost::optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) {
+optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) {
Map::iterator i = joiners.find(to);
if (isAlive(from) && i != joiners.end()) {
Url url= i->second;
joiners.erase(i); // No longer a potential updatee.
return url;
}
- return boost::optional<Url>();
+ return optional<Url>();
}
-ClusterMap::Set ClusterMap::intersection(const ClusterMap::Set& a, const ClusterMap::Set& b)
-{
- Set intersection;
- std::set_intersection(a.begin(), a.end(),
- b.begin(), b.end(),
- std::inserter(intersection, intersection.begin()));
- return intersection;
-
-}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Tue Nov 17 18:09:21 2009
@@ -22,7 +22,7 @@
*
*/
-#include "qpid/cluster/types.h"
+#include "MemberSet.h"
#include "qpid/Url.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/SequenceNumber.h"
@@ -47,16 +47,14 @@
typedef std::map<MemberId, Url> Map;
typedef std::set<MemberId> Set;
- static Set decode(const std::string&);
-
ClusterMap();
- ClusterMap(const MemberId& id, const Url& url, bool isReady);
+ ClusterMap(const Map& map);
ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq);
/** Update from config change.
*@return true if member set changed.
*/
- bool configChange(const std::string& addresses);
+ bool configChange(const Set& members);
bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
@@ -85,11 +83,6 @@
/**@return true If this is a new member */
bool ready(const MemberId& id, const Url&);
- /**
- * Utility method to return intersection of two member sets
- */
- static Set intersection(const Set& a, const Set& b);
-
framing::SequenceNumber getFrameSeq() { return frameSeq; }
framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; }
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Nov 17 18:09:21 2009
@@ -105,7 +105,7 @@
QPID_LOG(notice, "Initializing CPG");
cpg_error_t err = cpg_initialize(&handle, &callbacks);
- int retries = 6; // FIXME aconway 2009-08-06: configure, use same config for cman connection.
+ int retries = 6; // FIXME aconway 2009-08-06: make this configurable.
while (err == CPG_ERR_TRY_AGAIN && --retries) {
QPID_LOG(notice, "Re-trying CPG initialization.");
sys::sleep(5);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Tue Nov 17 18:09:21 2009
@@ -39,11 +39,6 @@
: cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
{}
-ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) {
- copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
- return o;
-}
-
void ErrorCheck::error(
Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms,
const std::string& msg)
@@ -115,7 +110,7 @@
const ClusterConfigChangeBody* configChange =
static_cast<const ClusterConfigChangeBody*>(method);
if (configChange) {
- MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ MemberSet members(decodeMemberSet(configChange->getCurrent()));
QPID_LOG(debug, cluster << " apply config change to error "
<< frameSeq << ": " << members);
MemberSet intersect;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Tue Nov 17 18:09:21 2009
@@ -22,7 +22,7 @@
*
*/
-#include "qpid/cluster/types.h"
+#include "qpid/cluster/MemberSet.h"
#include "qpid/cluster/Multicaster.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/SequenceNumber.h"
@@ -34,7 +34,6 @@
namespace cluster {
class EventFrame;
-class ClusterMap;
class Cluster;
class Multicaster;
class Connection;
@@ -48,7 +47,6 @@
class ErrorCheck
{
public:
- typedef std::set<MemberId> MemberSet;
typedef framing::cluster::ErrorType ErrorType;
typedef framing::SequenceNumber SequenceNumber;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Tue Nov 17 18:09:21 2009
@@ -29,13 +29,12 @@
namespace cluster {
InitialStatusMap::InitialStatusMap(const MemberId& self_)
- : self(self_), complete(), updateNeeded(), resendNeeded()
-{
- map[self] = optional<Status>();
-}
+ : self(self_), completed(), resendNeeded()
+{}
void InitialStatusMap::configChange(const MemberSet& members) {
resendNeeded = false;
+ bool wasComplete = isComplete();
if (firstConfig.empty()) firstConfig = members;
MemberSet::const_iterator i = members.begin();
Map::iterator j = map.begin();
@@ -66,10 +65,13 @@
for (Map::iterator i = map.begin(); i != map.end(); ++i)
i->second = optional<Status>();
}
+ completed = isComplete() && !wasComplete; // Set completed on the transition.
}
void InitialStatusMap::received(const MemberId& m, const Status& s){
+ bool wasComplete = isComplete();
map[m] = s;
+ completed = isComplete() && !wasComplete; // Set completed on the transition.
}
bool InitialStatusMap::notInitialized(const Map::value_type& v) {
@@ -81,7 +83,11 @@
}
bool InitialStatusMap::isComplete() {
- return find_if(map.begin(), map.end(), ¬Initialized) == map.end();
+ return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end();
+}
+
+bool InitialStatusMap::transitionToComplete() {
+ return completed;
}
bool InitialStatusMap::isResendNeeded() {
@@ -107,4 +113,26 @@
return elders;
}
+// Get cluster ID from an active member or the youngest newcomer.
+framing::Uuid InitialStatusMap::getClusterId() {
+ assert(isComplete());
+ assert(!map.empty());
+ Map::iterator i = find_if(map.begin(), map.end(), &isActive);
+ if (i != map.end())
+ return i->second->getClusterId(); // An active member
+ else
+ return map.begin()->second->getClusterId();
+}
+
+std::map<MemberId, Url> InitialStatusMap::getMemberUrls() {
+ assert(isComplete());
+ assert(!isUpdateNeeded());
+ std::map<MemberId, Url> urlMap;
+ for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+ assert(i->second);
+ urlMap.insert(std::make_pair(i->first, i->second->getUrl()));
+ }
+ return urlMap;
+}
+
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Tue Nov 17 18:09:21 2009
@@ -22,7 +22,7 @@
*
*/
-#include "types.h"
+#include "MemberSet.h"
#include <qpid/framing/ClusterInitialStatusBody.h>
#include <boost/optional.hpp>
@@ -48,10 +48,19 @@
/**@return true if the map is complete. */
bool isComplete();
- /**@pre isComplete. @return this node's elders */
+ /**@return true if the map was completed by the last config change or received. */
+ bool transitionToComplete();
+ /**@pre isComplete(). @return this node's elders */
MemberSet getElders();
- /**@pre isComplete. @return True if we need an update. */
+ /**@pre isComplete(). @return True if we need an update. */
bool isUpdateNeeded();
+ /**@pre isComplete(). @return Cluster-wide cluster ID. */
+ framing::Uuid getClusterId();
+
+ /**@pre isComplete() && !isUpdateNeeded().
+ *@return member->URL map for all members.
+ */
+ std::map<MemberId, Url> getMemberUrls();
private:
typedef std::map<MemberId, boost::optional<Status> > Map;
@@ -61,7 +70,7 @@
Map map;
MemberSet firstConfig;
MemberId self;
- bool complete, updateNeeded, resendNeeded;
+ bool completed, resendNeeded;
};
}} // namespace qpid::cluster
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp?rev=881423&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp Tue Nov 17 18:09:21 2009
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MemberSet.h"
+#include <ostream>
+
+namespace qpid {
+namespace cluster {
+
+MemberSet decodeMemberSet(const std::string& s) {
+ MemberSet set;
+ for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) {
+ assert(size_t(i-s.begin())+8 <= s.size());
+ set.insert(MemberId(std::string(i, i+8)));
+ }
+ return set;
+}
+
+MemberSet intersection(const MemberSet& a, const MemberSet& b)
+{
+ MemberSet intersection;
+ std::set_intersection(a.begin(), a.end(),
+ b.begin(), b.end(),
+ std::inserter(intersection, intersection.begin()));
+ return intersection;
+
+}
+
+std::ostream& operator<<(std::ostream& o, const MemberSet& ms) {
+ copy(ms.begin(), ms.end(), std::ostream_iterator<MemberId>(o, " "));
+ return o;
+}
+
+}} // namespace qpid::cluster
Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h (from r881420, qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h&p1=qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp&r1=881420&r2=881423&rev=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h Tue Nov 17 18:09:21 2009
@@ -1,3 +1,6 @@
+#ifndef QPID_CLUSTER_MEMBERSET_H
+#define QPID_CLUSTER_MEMBERSET_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -7,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,27 +21,23 @@
* under the License.
*
*/
-#include "qpid/framing/AMQContentBody.h"
-#include <iostream>
-qpid::framing::AMQContentBody::AMQContentBody(){
-}
+#include "types.h"
+#include <set>
+#include <iosfwd>
+
+namespace qpid {
+namespace cluster {
+
+typedef std::set<MemberId> MemberSet;
+
+MemberSet decodeMemberSet(const std::string&);
+
+MemberSet intersection(const MemberSet& a, const MemberSet& b);
+
+std::ostream& operator<<(std::ostream& o, const MemberSet& ms);
+
-qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : data(_data){
-}
+}} // namespace qpid::cluster
-uint32_t qpid::framing::AMQContentBody::encodedSize() const{
- return data.size();
-}
-void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{
- buffer.putRawData(data);
-}
-void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){
- buffer.getRawData(data, _size);
-}
-
-void qpid::framing::AMQContentBody::print(std::ostream& out) const
-{
- out << "content (" << encodedSize() << " bytes)";
- out << " " << data.substr(0,16) << "...";
-}
+#endif /*!QPID_CLUSTER_MEMBERSET_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Nov 17 18:09:21 2009
@@ -29,8 +29,6 @@
#include <utility>
#include <iosfwd>
#include <string>
-#include <set>
-
extern "C" {
#if defined (HAVE_OPENAIS_CPG_H)
@@ -76,8 +74,6 @@
uint64_t getNumber() const { return second; }
};
-typedef std::set<MemberId> MemberSet;
-
std::ostream& operator<<(std::ostream&, const ConnectionId&);
std::ostream& operator<<(std::ostream&, EventType);
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Tue Nov 17 18:09:21 2009
@@ -40,5 +40,7 @@
void qpid::framing::AMQContentBody::print(std::ostream& out) const
{
out << "content (" << encodedSize() << " bytes)";
- out << " " << data.substr(0,16) << "...";
+ const size_t max = 32;
+ out << " " << data.substr(0, max);
+ if (data.size() > max) out << "...";
}
Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Tue Nov 17 18:09:21 2009
@@ -20,6 +20,7 @@
#include "unit_test.h"
#include "test_tools.h"
#include "qpid/cluster/InitialStatusMap.h"
+#include "qpid/framing/Uuid.h"
#include <boost/assign.hpp>
using namespace std;
@@ -34,58 +35,72 @@
typedef InitialStatusMap::Status Status;
-Status activeStatus() { return Status(ProtocolVersion(), true, false, FieldTable()); }
-Status newcomerStatus() { return Status(ProtocolVersion(), false, false, FieldTable()); }
+Status activeStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), true, false, id, 0, ""); }
+Status newcomerStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), false, false, id, 0, ""); }
QPID_AUTO_TEST_CASE(testFirstInCluster) {
// Single member is first in cluster.
InitialStatusMap map(MemberId(0));
+ Uuid id(true);
BOOST_CHECK(!map.isComplete());
MemberSet members = list_of(MemberId(0));
map.configChange(members);
BOOST_CHECK(!map.isComplete());
- map.received(MemberId(0), newcomerStatus());
+ map.received(MemberId(0), newcomerStatus(id));
BOOST_CHECK(map.isComplete());
+ BOOST_CHECK(map.transitionToComplete());
BOOST_CHECK(map.getElders().empty());
BOOST_CHECK(!map.isUpdateNeeded());
+ BOOST_CHECK_EQUAL(id, map.getClusterId());
}
QPID_AUTO_TEST_CASE(testJoinExistingCluster) {
// Single member 0 joins existing cluster 1,2
InitialStatusMap map(MemberId(0));
+ Uuid id(true);
MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2));
map.configChange(members);
BOOST_CHECK(map.isResendNeeded());
BOOST_CHECK(!map.isComplete());
map.received(MemberId(0), newcomerStatus());
- map.received(MemberId(1), activeStatus());
+ map.received(MemberId(1), activeStatus(id));
BOOST_CHECK(!map.isComplete());
- map.received(MemberId(2), activeStatus());
+ map.received(MemberId(2), activeStatus(id));
BOOST_CHECK(map.isComplete());
+ BOOST_CHECK(map.transitionToComplete());
BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(1)(2));
BOOST_CHECK(map.isUpdateNeeded());
+ BOOST_CHECK_EQUAL(map.getClusterId(), id);
+
+ // Check that transitionToComplete is reset.
+ map.configChange(list_of<MemberId>(0)(1));
+ BOOST_CHECK(!map.transitionToComplete());
}
QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) {
// Multiple members 0,1,2 join at same time.
InitialStatusMap map(MemberId(1)); // self is 1
+ Uuid id(true);
MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2));
map.configChange(members);
BOOST_CHECK(map.isResendNeeded());
// All new members
- map.received(MemberId(0), newcomerStatus());
+ map.received(MemberId(0), newcomerStatus(id));
map.received(MemberId(1), newcomerStatus());
map.received(MemberId(2), newcomerStatus());
BOOST_CHECK(!map.isResendNeeded());
BOOST_CHECK(map.isComplete());
+ BOOST_CHECK(map.transitionToComplete());
BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(2)));
BOOST_CHECK(!map.isUpdateNeeded());
+ BOOST_CHECK_EQUAL(map.getClusterId(), id);
}
QPID_AUTO_TEST_CASE(testMultipleJoinExisting) {
// Multiple members 1,2,3 join existing cluster containing 0.
InitialStatusMap map(MemberId(2)); // self is 2
+ Uuid id(true);
MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3));
map.configChange(members);
BOOST_CHECK(map.isResendNeeded());
@@ -93,28 +108,34 @@
map.received(MemberId(1), newcomerStatus());
map.received(MemberId(2), newcomerStatus());
map.received(MemberId(3), newcomerStatus());
- map.received(MemberId(0), activeStatus());
+ map.received(MemberId(0), activeStatus(id));
BOOST_CHECK(!map.isResendNeeded());
BOOST_CHECK(map.isComplete());
+ BOOST_CHECK(map.transitionToComplete());
BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(0))(MemberId(3)));
BOOST_CHECK(map.isUpdateNeeded());
+ BOOST_CHECK_EQUAL(map.getClusterId(), id);
}
QPID_AUTO_TEST_CASE(testMembersLeave) {
// Test that map completes if members leave rather than send status.
InitialStatusMap map(MemberId(0));
+ Uuid id(true);
map.configChange(list_of(MemberId(0))(MemberId(1))(MemberId(2)));
map.received(MemberId(0), newcomerStatus());
- map.received(MemberId(1), activeStatus());
+ map.received(MemberId(1), activeStatus(id));
BOOST_CHECK(!map.isComplete());
map.configChange(list_of(MemberId(0))(MemberId(1))); // 2 left
BOOST_CHECK(map.isComplete());
+ BOOST_CHECK(map.transitionToComplete());
BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(1)));
+ BOOST_CHECK_EQUAL(map.getClusterId(), id);
}
QPID_AUTO_TEST_CASE(testInteveningConfig) {
// Multiple config changes arrives before we complete the map.
InitialStatusMap map(MemberId(0));
+ Uuid id(true);
map.configChange(list_of<MemberId>(0)(1));
BOOST_CHECK(map.isResendNeeded());
@@ -125,7 +146,7 @@
map.configChange(list_of<MemberId>(0)(1)(2));
BOOST_CHECK(!map.isComplete());
BOOST_CHECK(map.isResendNeeded());
- map.received(1, activeStatus());
+ map.received(1, activeStatus(id));
map.received(2, newcomerStatus());
// We should not be complete as we haven't received 0 since new member joined
BOOST_CHECK(!map.isComplete());
@@ -133,7 +154,9 @@
map.received(0, newcomerStatus());
BOOST_CHECK(map.isComplete());
+ BOOST_CHECK(map.transitionToComplete());
BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(1));
+ BOOST_CHECK_EQUAL(map.getClusterId(), id);
}
QPID_AUTO_TEST_SUITE_END()
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Nov 17 18:09:21 2009
@@ -29,6 +29,11 @@
class ClusterTests(BrokerTest):
"""Cluster tests with support for testing with a store plugin."""
+ def duration(self):
+ d = self.config.defines.get("DURATION")
+ if d: return float(d)*60
+ else: return 3
+
def test_message_replication(self):
"""Test basic cluster message replication."""
# Start a cluster, send some messages to member 0.
@@ -66,11 +71,10 @@
sender = NumberedSender(cluster[2])
sender.start()
- # Kill original brokers, start new ones.
- endtime = time.time() + (int(self.config.defines.get("DURATION") or 3))
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
- print time.time(), endtime
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Nov 17 18:09:21 2009
@@ -40,8 +40,6 @@
<!-- Sender offers an update to a new joiner. -->
<control name = "update-offer" code="0x2">
<field name="updatee" type="uint64"/>
- <field name="cluster-id" type="uuid"/>
- <field name="version" type="uint32"/>
</control>
<!-- Sender retracts an offer to a new joiner. -->
@@ -53,7 +51,9 @@
<control name="initial-status" code="0x4">
<field name="active" type="bit"/>
<field name="persistent" type="bit"/>
- <field name="properties" type="map"/>>
+ <field name="cluster-id" type="uuid"/>>
+ <field name="version" type="uint32"/>
+ <field name="url" type="str16"/>>
</control>
<!-- New member or updater is ready as an active member. -->
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org