You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/26 23:49:53 UTC
svn commit: r699513 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/cluster:
Cluster.cpp Cluster.h ClusterHandler.h ClusterMap.cpp Connection.cpp
DumpClient.cpp JoiningHandler.cpp JoiningHandler.h MemberHandler.cpp
MemberHandler.h
Author: aconway
Date: Fri Sep 26 14:49:52 2008
New Revision: 699513
URL: http://svn.apache.org/viewvc?rev=699513&view=rev
Log:
Clean up end-of-dump protocol for new cluster members.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep 26 14:49:52 2008
@@ -91,9 +91,12 @@
Cluster::~Cluster() {}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); }
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+}
-void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { handler->catchUpClosed(c); }
+void Cluster::dumpComplete() { handler->dumpComplete(); }
void Cluster::erase(ConnectionId id) {
Mutex::ScopedLock l(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Sep 26 14:49:52 2008
@@ -62,10 +62,10 @@
virtual ~Cluster();
- void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection
- void erase(ConnectionId); // Erase a connection.
-
- void catchUpClosed(const boost::intrusive_ptr<Connection>&); // Insert a local connection
+ // FIXME aconway 2008-09-26: thread safety
+ void insert(const boost::intrusive_ptr<Connection>&);
+ void erase(ConnectionId);
+ void dumpComplete();
/** Get the URLs of current cluster members. */
std::vector<Url> getUrls() const;
@@ -94,8 +94,6 @@
broker::Broker& getBroker();
- void setDumpComplete();
-
template <class F> void eachConnection(const F& f) {
for (ConnectionMap::const_iterator i = connections.begin(); i != connections.end(); ++i)
f(i->second);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h Fri Sep 26 14:49:52 2008
@@ -59,8 +59,7 @@
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined) = 0;
- virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0;
- virtual void catchUpClosed(const boost::intrusive_ptr<Connection>& c) = 0;
+ virtual void dumpComplete() = 0;
protected:
Cluster& cluster;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Fri Sep 26 14:49:52 2008
@@ -45,7 +45,7 @@
changed = members.erase(*a) || changed;
if (dumper && !isMember(dumper))
dumper = MemberId();
- QPID_LOG_IF(debug, changed, *this);
+ QPID_LOG_IF(debug, changed, "Members left. " << *this);
return changed;
}
@@ -66,7 +66,7 @@
Url url(i->second->get<std::string>());
changed = members.insert(Members::value_type(id, url)).second || changed;
}
- QPID_LOG_IF(debug, changed, *this);
+ QPID_LOG_IF(debug, changed, "Update: " << *this);
return changed;
}
@@ -94,12 +94,11 @@
bool changed = members.insert(Members::value_type(id,url)).second;
if (id == dumper) {
dumper = MemberId();
- QPID_LOG(info, id << " finished dump.");
+ QPID_LOG(info, id << " finished dump. " << *this);
}
else {
- QPID_LOG(info, id << " joined cluster, url=" << url);
+ QPID_LOG(info, id << " joined, url=" << url << ". " << *this);
}
- QPID_LOG_IF(debug, changed, *this);
return changed;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Sep 26 14:49:52 2008
@@ -76,10 +76,13 @@
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, "RECV " << *this << ": " << f);
if (isShadow()) {
- // Final close that completes catch-up for shadow connection.
+ // Intercept the close that completes catch-up for shadow a connection.
if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ catchUp = false;
AMQFrame ok(in_place<ConnectionCloseOkBody>());
+ cluster.insert(boost::intrusive_ptr<Connection>(this));
connection.getOutput().send(ok);
+ output.setOutputHandler(discardHandler);
}
else
QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
@@ -104,27 +107,13 @@
void Connection::closed() {
try {
QPID_LOG(debug, "Connection closed " << *this);
-
- if (catchUp) {
- cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
- if (isShadow())
- catchUp = false;
- else {
- connection.closed();
- return;
- }
- }
-
- // Local network connection has closed. We need to keep the
- // connection around but replace the output handler with a
- // no-op handler as the network output handler will be
- // deleted.
- output.setOutputHandler(discardHandler);
-
- if (isLocal() && !catchUp) {
+ if (catchUp)
+ connection.closed();
+ else if (isLocal()) {
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
+ output.setOutputHandler(discardHandler);
cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
++mcastSeq;
}
@@ -188,11 +177,10 @@
ConnectionId shadow = ConnectionId(memberId, connectionId);
QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow);
self = shadow;
- assert(isShadow());
}
void Connection::dumpComplete() {
- // FIXME aconway 2008-09-18: use or remove.
+ cluster.dumpComplete();
}
bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Fri Sep 26 14:49:52 2008
@@ -59,6 +59,12 @@
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
+ ClusterConnectionProxy(client::Connection& c) :
+ AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
+};
+
+
// Create a connection with special version that marks it as a catch-up connection.
client::Connection catchUpConnection() {
client::Connection c;
@@ -101,7 +107,7 @@
session.sync();
session.close();
donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
- // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
+ ClusterConnectionProxy(connection).dumpComplete();
connection.close();
QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver);
}
@@ -160,10 +166,9 @@
// authentication etc. See ConnectionSettings.
shadowConnection.open(receiver, bc.getUserId());
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
- boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection);
- AMQP_AllProxy::ClusterConnection proxy(*impl);
- proxy.shadowReady(dumpConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
+ ClusterConnectionProxy(shadowConnection).shadowReady(
+ dumpConnection->getId().getMember(),
+ reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
shadowConnection.close();
QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Fri Sep 26 14:49:52 2008
@@ -105,27 +105,8 @@
checkDumpRequest();
}
-void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(cluster.lock);
- if (c->isCatchUp()) {
- ++catchUpConnections;
- QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
- }
- cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
-}
-
-void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(cluster.lock);
- QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1);
- if (c->isShadow())
- cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
- if (--catchUpConnections == 0)
- dumpComplete();
-}
-
void JoiningHandler::dumpComplete() {
- // FIXME aconway 2008-09-18: need to detect incomplete dump.
- // Called with lock - volatile?
+ Mutex::ScopedLock l(cluster.lock);
if (state == STALLED) {
QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
cluster.ready();
@@ -135,6 +116,7 @@
assert(state == DUMP_REQUESTED);
state = DUMP_COMPLETE;
}
+ // FIXME aconway 2008-09-18: need to detect incomplete dump.
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h Fri Sep 26 14:49:52 2008
@@ -46,12 +46,10 @@
void dumpRequest(const MemberId&, const std::string& url);
void ready(const MemberId&, const std::string& url);
- void insert(const boost::intrusive_ptr<Connection>& c);
- void catchUpClosed(const boost::intrusive_ptr<Connection>& c);
+ void dumpComplete();
private:
void checkDumpRequest();
- void dumpComplete();
enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
size_t catchUpConnections;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Fri Sep 26 14:49:52 2008
@@ -90,18 +90,6 @@
dumpSent();
}
-void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(cluster.lock);
- if (c->isCatchUp()) // Not allowed in member mode
- c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode.");
- else
- cluster.connections[c->getId()] = c;
-}
-
-void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(cluster.lock);
- QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode");
- assert(0);
-}
+void MemberHandler::dumpComplete() { assert(0); }
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h Fri Sep 26 14:49:52 2008
@@ -52,8 +52,7 @@
void dumpSent();
void dumpError(const std::exception&);
- void insert(const boost::intrusive_ptr<Connection>& c);
- void catchUpClosed(const boost::intrusive_ptr<Connection>& );
+ void dumpComplete();
public:
sys::Thread dumpThread;