You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/26 23:49:53 UTC

svn commit: r699513 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h ClusterHandler.h ClusterMap.cpp Connection.cpp DumpClient.cpp JoiningHandler.cpp JoiningHandler.h MemberHandler.cpp MemberHandler.h

Author: aconway
Date: Fri Sep 26 14:49:52 2008
New Revision: 699513

URL: http://svn.apache.org/viewvc?rev=699513&view=rev
Log:
Clean up end-of-dump protocol for new cluster members.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep 26 14:49:52 2008
@@ -91,9 +91,12 @@
 
 Cluster::~Cluster() {}
 
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); }
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+    Mutex::ScopedLock l(lock);
+    connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+}
 
-void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { handler->catchUpClosed(c); }
+void Cluster::dumpComplete() { handler->dumpComplete(); }
 
 void Cluster::erase(ConnectionId id) {
     Mutex::ScopedLock l(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Sep 26 14:49:52 2008
@@ -62,10 +62,10 @@
 
     virtual ~Cluster();
 
-    void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection
-    void erase(ConnectionId);          // Erase a connection.
-
-    void catchUpClosed(const boost::intrusive_ptr<Connection>&); // Insert a local connection
+    // FIXME aconway 2008-09-26: thread safety
+    void insert(const boost::intrusive_ptr<Connection>&); 
+    void erase(ConnectionId);       
+    void dumpComplete();
     
     /** Get the URLs of current cluster members. */
     std::vector<Url> getUrls() const;
@@ -94,8 +94,6 @@
 
     broker::Broker& getBroker();
 
-    void setDumpComplete();
-
     template <class F> void eachConnection(const F& f) {
         for (ConnectionMap::const_iterator i = connections.begin(); i != connections.end(); ++i)
             f(i->second);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h Fri Sep 26 14:49:52 2008
@@ -59,8 +59,7 @@
                               cpg_address *left, int nLeft,
                               cpg_address *joined, int nJoined) = 0;
 
-    virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0;
-    virtual void catchUpClosed(const boost::intrusive_ptr<Connection>& c) = 0;
+    virtual void dumpComplete()  = 0;
 
   protected:
     Cluster& cluster;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Fri Sep 26 14:49:52 2008
@@ -45,7 +45,7 @@
         changed = members.erase(*a) || changed;
     if (dumper && !isMember(dumper))
         dumper = MemberId();
-    QPID_LOG_IF(debug, changed, *this);
+    QPID_LOG_IF(debug, changed, "Members left. " << *this);
     return changed;
 }
 
@@ -66,7 +66,7 @@
         Url url(i->second->get<std::string>());
         changed = members.insert(Members::value_type(id, url)).second || changed;
     }
-    QPID_LOG_IF(debug, changed, *this);
+    QPID_LOG_IF(debug, changed, "Update: " << *this);
     return changed;
 }
 
@@ -94,12 +94,11 @@
     bool changed = members.insert(Members::value_type(id,url)).second;
     if (id == dumper) {
         dumper = MemberId();
-        QPID_LOG(info, id << " finished dump.");
+        QPID_LOG(info, id << " finished dump. " << *this);
     }
     else {
-        QPID_LOG(info, id << " joined cluster, url=" << url);
+        QPID_LOG(info, id << " joined, url=" << url << ". " << *this);
     }
-    QPID_LOG_IF(debug, changed, *this);
     return changed;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Sep 26 14:49:52 2008
@@ -76,10 +76,13 @@
 void Connection::received(framing::AMQFrame& f) {
     QPID_LOG(trace, "RECV " << *this << ": " << f);
     if (isShadow()) {           
-        // Final close that completes catch-up for shadow connection.
+        // Intercept the close that completes catch-up for shadow a connection.
         if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { 
+            catchUp = false;
             AMQFrame ok(in_place<ConnectionCloseOkBody>());
+            cluster.insert(boost::intrusive_ptr<Connection>(this));
             connection.getOutput().send(ok);
+            output.setOutputHandler(discardHandler);
         }
         else
             QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
@@ -104,27 +107,13 @@
 void Connection::closed() {
     try {
         QPID_LOG(debug, "Connection closed " << *this);
-
-        if (catchUp) {
-            cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
-            if (isShadow()) 
-                catchUp = false;
-            else {
-                connection.closed();
-                return;
-            }
-        }
-
-        // Local network connection has closed.  We need to keep the
-        // connection around but replace the output handler with a
-        // no-op handler as the network output handler will be
-        // deleted.
-        output.setOutputHandler(discardHandler);
-
-        if (isLocal() && !catchUp) {
+        if (catchUp) 
+            connection.closed();
+        else if (isLocal()) {
             // This was a local replicated connection. Multicast a deliver
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
+            output.setOutputHandler(discardHandler);
             cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
             ++mcastSeq;
         }
@@ -188,11 +177,10 @@
     ConnectionId shadow = ConnectionId(memberId, connectionId);
     QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow);
     self = shadow;
-    assert(isShadow());
 }
 
 void Connection::dumpComplete() {
-    // FIXME aconway 2008-09-18: use or remove.
+    cluster.dumpComplete();
 }
 
 bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Fri Sep 26 14:49:52 2008
@@ -59,6 +59,12 @@
 namespace arg=client::arg;
 using client::SessionBase_0_10Access;
 
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
+    ClusterConnectionProxy(client::Connection& c) :
+        AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
+};
+
+
 // Create a connection with special version that marks it as a catch-up connection.
 client::Connection catchUpConnection() {
     client::Connection c;
@@ -101,7 +107,7 @@
     session.sync();
     session.close();
     donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
-    // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
+    ClusterConnectionProxy(connection).dumpComplete();
     connection.close();
     QPID_LOG(debug,  donor.getSelf() << " dumped all state to " << receiver);
 }
@@ -160,10 +166,9 @@
     // authentication etc. See ConnectionSettings.
     shadowConnection.open(receiver, bc.getUserId());
     dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
-    boost::shared_ptr<client::ConnectionImpl> impl = client::ConnectionAccess::getImpl(shadowConnection);
-    AMQP_AllProxy::ClusterConnection proxy(*impl);
-    proxy.shadowReady(dumpConnection->getId().getMember(),
-                      reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
+    ClusterConnectionProxy(shadowConnection).shadowReady(
+        dumpConnection->getId().getMember(),
+        reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
     shadowConnection.close();
     QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Fri Sep 26 14:49:52 2008
@@ -105,27 +105,8 @@
     checkDumpRequest();
 }
 
-void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
-    Mutex::ScopedLock l(cluster.lock);
-    if (c->isCatchUp()) {
-        ++catchUpConnections;
-        QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
-    }
-    cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
-}
-
-void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
-    Mutex::ScopedLock l(cluster.lock);
-    QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1);
-    if (c->isShadow()) 
-        cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
-    if (--catchUpConnections == 0)
-        dumpComplete();
-}
-
 void JoiningHandler::dumpComplete() {
-    // FIXME aconway 2008-09-18: need to detect incomplete dump.
-    // Called with lock  - volatile?
+    Mutex::ScopedLock l(cluster.lock);
     if (state == STALLED) {
         QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
         cluster.ready();
@@ -135,6 +116,7 @@
         assert(state == DUMP_REQUESTED);
         state = DUMP_COMPLETE;
     }
+    // FIXME aconway 2008-09-18: need to detect incomplete dump.
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h Fri Sep 26 14:49:52 2008
@@ -46,12 +46,10 @@
     void dumpRequest(const MemberId&, const std::string& url);
     void ready(const MemberId&, const std::string& url);
 
-    void insert(const boost::intrusive_ptr<Connection>& c);
-    void catchUpClosed(const boost::intrusive_ptr<Connection>& c);
+    void dumpComplete();
     
   private:
     void checkDumpRequest();
-    void dumpComplete();
 
     enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
     size_t catchUpConnections;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Fri Sep 26 14:49:52 2008
@@ -90,18 +90,6 @@
     dumpSent();
 }
 
-void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
-    Mutex::ScopedLock l(cluster.lock);
-    if (c->isCatchUp())         // Not allowed in member mode
-        c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode.");
-    else
-        cluster.connections[c->getId()] = c;
-}
-
-void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
-    Mutex::ScopedLock l(cluster.lock);
-    QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode");
-    assert(0);
-}
+void MemberHandler::dumpComplete() { assert(0); }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h?rev=699513&r1=699512&r2=699513&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h Fri Sep 26 14:49:52 2008
@@ -52,8 +52,7 @@
     void dumpSent();
     void dumpError(const std::exception&);
 
-    void insert(const boost::intrusive_ptr<Connection>& c);
-    void catchUpClosed(const boost::intrusive_ptr<Connection>& );
+    void dumpComplete();
 
   public:
     sys::Thread dumpThread;