You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/30 22:57:13 UTC
svn commit: r929274 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/Cluster.cpp
src/qpid/cluster/Cluster.h src/qpid/cluster/ErrorCheck.cpp
src/qpid/cluster/StoreStatus.h xml/cluster.xml
Author: aconway
Date: Tue Mar 30 20:57:12 2010
New Revision: 929274
URL: http://svn.apache.org/viewvc?rev=929274&view=rev
Log:
Cluster logging improvements: log config changes in the deliver thread.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Mar 30 20:57:12 2010
@@ -217,8 +217,11 @@ struct ClusterDispatcher : public framin
void ready(const std::string& url) {
cluster.ready(member, url, l);
}
- void configChange(const std::string& current) {
- cluster.configChange(member, current, l);
+ void configChange(const std::string& members,
+ const std::string& left,
+ const std::string& joined)
+ {
+ cluster.configChange(member, members, left, joined, l);
}
void updateOffer(uint64_t updatee) {
cluster.updateOffer(member, updatee, l);
@@ -554,40 +557,28 @@ Cluster::ConnectionVector Cluster::getCo
boost::bind(&ConnectionMap::value_type::second, _1));
return result;
}
-
-struct AddrList {
- const cpg_address* addrs;
- int count;
- const char *prefix;
- AddrList(const cpg_address* a, int n, const char* p="")
- : addrs(a), count(n), prefix(p) {}
-};
-
-ostream& operator<<(ostream& o, const AddrList& a) {
- if (!a.count) return o;
- o << a.prefix;
- for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
- o << qpid::cluster::MemberId(*p) << " ";
- return o;
-}
+// CPG config-change callback.
void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
- const cpg_address *current, int nCurrent,
+ const cpg_address *members, int nMembers,
const cpg_address *left, int nLeft,
const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(notice, *this << " membership change: "
- << AddrList(current, nCurrent) << "("
- << AddrList(joined, nJoined, "joined: ")
- << AddrList(left, nLeft, "left: ")
- << ")");
- string addresses;
- for (const cpg_address* p = current; p < current+nCurrent; ++p)
- addresses.append(MemberId(*p).str());
- deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+ string membersStr, leftStr, joinedStr;
+ // Encode members and enqueue as an event so the config change can
+ // be executed in the correct thread.
+ for (const cpg_address* p = members; p < members+nMembers; ++p)
+ membersStr.append(MemberId(*p).str());
+ for (const cpg_address* p = left; p < left+nLeft; ++p)
+ leftStr.append(MemberId(*p).str());
+ for (const cpg_address* p = joined; p < joined+nJoined; ++p)
+ joinedStr.append(MemberId(*p).str());
+ deliverEvent(Event::control(ClusterConfigChangeBody(
+ ProtocolVersion(), membersStr, leftStr, joinedStr),
+ self));
}
void Cluster::setReady(Lock&) {
@@ -654,22 +645,33 @@ void Cluster::initMapCompleted(Lock& l)
}
}
-void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
+void Cluster::configChange(const MemberId&,
+ const std::string& membersStr,
+ const std::string& leftStr,
+ const std::string& joinedStr,
+ Lock& l)
+{
if (state == LEFT) return;
+ MemberSet members = decodeMemberSet(membersStr);
+ MemberSet left = decodeMemberSet(leftStr);
+ MemberSet joined = decodeMemberSet(joinedStr);
+ QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": "
+ << members);
+ QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
+ QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
- MemberSet config = decodeMemberSet(configStr);
- elders = intersection(elders, config);
+ // Update initital status for members joining or leaving.
+ elders = intersection(elders, members);
if (elders.empty() && INIT < state && state < CATCHUP) {
QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
leave(l);
return;
}
- bool memberChange = map.configChange(config);
- QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+ bool memberChange = map.configChange(members);
store.setConfigSeq(map.getConfigSeq());
// Update initital status for members joining or leaving.
- initMap.configChange(config);
+ initMap.configChange(members);
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
@@ -965,8 +967,11 @@ void Cluster::memberUpdate(Lock& l) {
if (store.hasStore()) {
// Mark store clean if I am the only broker, dirty otherwise.
- if (size == 1) store.clean(Uuid(true));
- else store.dirty(clusterId);
+ if (size == 1 ) {
+ if (!store.isClean()) store.clean(Uuid(true));
+ } else {
+ if (!store.isDirty()) store.dirty(clusterId);
+ }
}
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Mar 30 20:57:12 2010
@@ -163,7 +163,11 @@ class Cluster : private Cpg::Handler, pu
const std::string& firstConfig,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
- void configChange(const MemberId&, const std::string& current, Lock& l);
+ void configChange(const MemberId&,
+ const std::string& members,
+ const std::string& left,
+ const std::string& joined,
+ Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Tue Mar 30 20:57:12 2010
@@ -110,7 +110,7 @@ ErrorCheck::FrameQueue::iterator ErrorCh
const ClusterConfigChangeBody* configChange =
static_cast<const ClusterConfigChangeBody*>(method);
if (configChange) {
- MemberSet members(decodeMemberSet(configChange->getCurrent()));
+ MemberSet members(decodeMemberSet(configChange->getMembers()));
QPID_LOG(debug, cluster << " apply config change to error "
<< frameSeq << ": " << members);
MemberSet intersect;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Tue Mar 30 20:57:12 2010
@@ -42,6 +42,9 @@ class StoreStatus
StoreStatus(const std::string& dir);
framing::cluster::StoreState getState() const { return state; }
+ bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; }
+ bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; }
+
const Uuid& getClusterId() const { return clusterId; }
const Uuid& getShutdownId() const { return shutdownId; }
framing::SequenceNumber getConfigSeq() const { return configSeq; }
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Mar 30 20:57:12 2010
@@ -73,7 +73,9 @@
</control>
<control name="config-change" code="0x11" label="Raw cluster membership.">
- <field name="current" type="vbin16"/> <!-- packed member-id array -->
+ <field name="members" type="vbin16"/> <!-- packed member-id array -->
+ <field name="joined" type="vbin16"/> <!-- packed member-id array -->
+ <field name="left" type="vbin16"/> <!-- packed member-id array -->
</control>
<control name="message-expired" code="0x12">
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org