You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/02/27 20:34:47 UTC
svn commit: r748651 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
Cluster.h
Author: aconway
Date: Fri Feb 27 19:34:47 2009
New Revision: 748651
URL: http://svn.apache.org/viewvc?rev=748651&view=rev
Log:
cluster: apply membership updates while in CATCHUP mode.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=748651&r1=748650&r2=748651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb 27 19:34:47 2009
@@ -234,9 +234,9 @@
// Check for deliver close here so we can erase the
// connection decoder safely in this thread.
if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>())
- decoder.erase(e.getConnectionId());
+ decoder.erase(e.getConnectionId());
deliverFrameQueue.push(EventFrame(e, frame));
- }
+ }
}
else if (e.getType() == DATA)
decoder.decode(e, e.getData());
@@ -345,7 +345,7 @@
broker.getLinks().setPassive(true);
}
}
- else if (state >= READY && memberChange) {
+ else if (state >= CATCHUP && memberChange) {
memberUpdate(l);
elders = ClusterMap::intersection(elders, map.getAlive());
if (elders.empty()) {
@@ -357,7 +357,7 @@
bool Cluster::isLeader() const { return elders.empty(); }
-void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
+void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
@@ -382,7 +382,7 @@
void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
map.updateRequest(id, url);
- tryMakeOffer(id, l);
+ makeOffer(id, l);
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
@@ -406,7 +406,7 @@
else { // Another offer was first.
setReady(l);
QPID_LOG(info, *this << " cancelled update offer to " << updatee);
- tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
}
else if (updatee == myId && url) {
@@ -446,7 +446,6 @@
}
void Cluster::checkUpdateIn(Lock& ) {
- if (state == LEFT) return;
if (state == UPDATEE && updatedMap) {
map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
@@ -467,7 +466,7 @@
state = READY;
mcast.release();
deliverFrameQueue.start();
- tryMakeOffer(map.firstJoiner(), l); // Try another offer
+ makeOffer(map.firstJoiner(), l); // Try another offer
}
void Cluster::updateOutError(const std::exception& e) {
@@ -522,7 +521,7 @@
size_t size = urls.size();
failoverExchange->setUrls(urls);
- if (size == 1 && lastSize > 1 && state >= READY) {
+ if (size == 1 && lastSize > 1 && state >= CATCHUP) {
QPID_LOG(info, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=748651&r1=748650&r2=748651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Feb 27 19:34:47 2009
@@ -119,7 +119,7 @@
std::vector<Url> getUrls(Lock&) const;
// Make an offer if we can - called in deliver thread.
- void tryMakeOffer(const MemberId&, Lock&);
+ void makeOffer(const MemberId&, Lock&);
// Called in main thread in ~Broker.
void brokerShutdown();
@@ -133,6 +133,8 @@
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
+
+ // Handlers for pollable queues.
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org