You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/26 21:22:01 UTC
svn commit: r699456 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/client/
qpid/cluster/
Author: aconway
Date: Fri Sep 26 12:22:00 2008
New Revision: 699456
URL: http://svn.apache.org/viewvc?rev=699456&view=rev
Log:
cluster:]
- call updateMemberStats() exactly once for each change in cluster membership.
- fix spurious replication of catch-up connection close events.
Removed unused client/MessageQueue.h
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 26 12:22:00 2008
@@ -507,7 +507,6 @@
qpid/client/LocalQueue.h \
qpid/client/Message.h \
qpid/client/MessageListener.h \
- qpid/client/MessageQueue.h \
qpid/client/Results.h \
qpid/client/SessionBase_0_10.h \
qpid/client/Session.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep 26 12:22:00 2008
@@ -248,7 +248,7 @@
cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent)
+ QPID_LOG(debug, "Process members: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
if (find(left, left+nLeft, self) != left+nLeft) {
@@ -258,7 +258,7 @@
return;
}
- map.left(left, nLeft);
+ if (map.left(left, nLeft)) updateMemberStats();
handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
}
@@ -326,7 +326,7 @@
mcastControl(ClusterShutdownBody(), 0);
}
-void Cluster::updateMemberStats(void) {
+void Cluster::updateMemberStats() {
if (mgmtObject) {
mgmtObject->set_clusterSize(size());
std::vector<Url> vectUrl = getUrls();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Fri Sep 26 12:22:00 2008
@@ -39,11 +39,14 @@
return (members.empty()) ? MemberId() : members.begin()->first;
}
-void ClusterMap::left(const cpg_address* addrs, size_t size) {
- size_t (Members::*erase)(const MemberId&) = &Members::erase;
- std::for_each(addrs, addrs+size, boost::bind(erase, &members, _1));
+bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) {
+ bool changed=false;
+ for (const cpg_address* a = addrs; a < addrs+nLeft; ++a)
+ changed = members.erase(*a) || changed;
if (dumper && !isMember(dumper))
dumper = MemberId();
+ QPID_LOG_IF(debug, changed, *this);
+ return changed;
}
framing::ClusterUpdateBody ClusterMap::toControl() const {
@@ -54,11 +57,17 @@
return b;
}
-void ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
- framing:: FieldTable::ValueMap::const_iterator i;
- for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
- members[i->first] = Url(i->second->get<std::string>());
+bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
dumper = MemberId(dumper_);
+ bool changed = false;
+ framing:: FieldTable::ValueMap::const_iterator i;
+ for (i = ftMembers.begin(); i != ftMembers.end(); ++i) {
+ MemberId id(i->first);
+ Url url(i->second->get<std::string>());
+ changed = members.insert(Members::value_type(id, url)).second || changed;
+ }
+ QPID_LOG_IF(debug, changed, *this);
+ return changed;
}
std::vector<Url> ClusterMap::memberUrls() const {
@@ -68,14 +77,12 @@
return result;
}
-std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv) {
- return o << mv.first << "=" << mv.second;
-}
-
std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
- std::ostream_iterator<ClusterMap::Members::value_type> im(o, "\n ");
- o << "dumper=" << m.dumper << ", members:\n ";
- std::copy(m.members.begin(), m.members.end(), im);
+ o << "Broker members:";
+ for (ClusterMap::Members::const_iterator i=m.members.begin(); i != m.members.end(); ++i) {
+ o << " " << i->first;
+ if (i->first == m.dumper) o << "(dumping)";
+ }
return o;
}
@@ -83,11 +90,17 @@
return dumper==id || (!dumper && first() == id);
}
-void ClusterMap::ready(const MemberId& id, const Url& url) {
- members[id] = url;
- if (id == dumper)
+bool ClusterMap::ready(const MemberId& id, const Url& url) {
+ bool changed = members.insert(Members::value_type(id,url)).second;
+ if (id == dumper) {
dumper = MemberId();
- QPID_LOG(info, id << " joined cluster: " << *this);
+ QPID_LOG(info, id << " finished dump.");
+ }
+ else {
+ QPID_LOG(info, id << " joined cluster, url=" << url);
+ }
+ QPID_LOG_IF(debug, changed, *this);
+ return changed;
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Fri Sep 26 12:22:00 2008
@@ -50,17 +50,21 @@
/** First member of the cluster in ID order, gets to perform one-off tasks. */
MemberId first() const;
- /** Update for members leaving. */
- void left(const cpg_address* addrs, size_t size);
+ /** Update for members leaving.
+ *@return true if the cluster membership changed.
+ */
+ bool left(const cpg_address* addrs, size_t size);
/** Convert map contents to a cluster update body. */
framing::ClusterUpdateBody toControl() const;
/** Add a new member or dump complete if id == dumper. */
- void ready(const MemberId& id, const Url& url);
+ bool ready(const MemberId& id, const Url& url);
- /** Apply update delivered from clsuter. */
- void update(const framing::FieldTable& members, uint64_t dumper);
+ /** Apply update delivered from cluster.
+ *@return true if cluster membership changed.
+ **/
+ bool update(const framing::FieldTable& members, uint64_t dumper);
bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
@@ -72,7 +76,6 @@
private:
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
- friend std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv);
};
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Sep 26 12:22:00 2008
@@ -106,23 +106,26 @@
QPID_LOG(debug, "Connection closed " << *this);
if (catchUp) {
- catchUp = false;
cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
- if (!isShadow()) connection.closed();
+ if (isShadow())
+ catchUp = false;
+ else
+ connection.closed();
}
-
- // Local network connection has closed. We need to keep the
- // connection around but replace the output handler with a
- // no-op handler as the network output handler will be
- // deleted.
- output.setOutputHandler(discardHandler);
-
- if (isLocal()) {
- // This was a local replicated connection. Multicast a deliver
- // closed and process any outstanding frames from the cluster
- // until self-delivery of deliver-close.
- cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
- ++mcastSeq;
+ else {
+ // Local network connection has closed. We need to keep the
+ // connection around but replace the output handler with a
+ // no-op handler as the network output handler will be
+ // deleted.
+ output.setOutputHandler(discardHandler);
+
+ if (isLocal()) {
+ // This was a local replicated connection. Multicast a deliver
+ // closed and process any outstanding frames from the cluster
+ // until self-delivery of deliver-close.
+ cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
+ ++mcastSeq;
+ }
}
}
catch (const std::exception& e) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Fri Sep 26 12:22:00 2008
@@ -55,10 +55,8 @@
void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
Mutex::ScopedLock l(cluster.lock);
- cluster.map.update(members, dumper);
- QPID_LOG(debug, "Cluster update: " << cluster.map);
+ if (cluster.map.update(members, dumper)) cluster.updateMemberStats();
checkDumpRequest();
- cluster.updateMemberStats();
}
void JoiningHandler::checkDumpRequest() {
@@ -99,9 +97,10 @@
}
}
-void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) {
Mutex::ScopedLock l(cluster.lock);
- cluster.map.ready(id, Url(url));
+ if (cluster.map.ready(id, Url(urlStr)))
+ cluster.updateMemberStats();
checkDumpRequest();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Fri Sep 26 12:22:00 2008
@@ -52,10 +52,8 @@
cluster.connectionEventQueue.push(e);
}
-void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {
- Mutex::ScopedLock l(cluster.lock);
- cluster.updateMemberStats();
-}
+// Updates are for new joiners.
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
Mutex::ScopedLock l(cluster.lock);
@@ -75,8 +73,9 @@
boost::bind(&MemberHandler::dumpError, this, _1)));
}
-void MemberHandler::ready(const MemberId& id, const std::string& url) {
- cluster.map.ready(id, Url(url));
+void MemberHandler::ready(const MemberId& id, const std::string& urlStr) {
+ if (cluster.map.ready(id, Url(urlStr)))
+ cluster.updateMemberStats();
}