You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/06/16 22:32:05 UTC

svn commit: r955370 - in /qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp qpid/cluster/Connection.cpp qpid/cluster/Connection.h qpid/cluster/OutputInterceptor.cpp qpid/cluster/UpdateClient.cpp tests/failover_soak.cpp

Author: aconway
Date: Wed Jun 16 20:32:04 2010
New Revision: 955370

URL: http://svn.apache.org/viewvc?rev=955370&view=rev
Log:
Bug 603835 - cluster_tests.test_management failing.

Clean up connections causing extra connection objects in the mangement agent map.
- update connection was not being closed.
- connections belonging to members that left the cluster were not fully cleaned up

Also fixed test errors making failover_soak fail sporadically.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jun 16 20:32:04 2010
@@ -361,7 +361,6 @@ void Cluster::erase(const ConnectionId& 
 
 // Called by Connection::deliverClose() in deliverFrameQueue thread.
 void Cluster::erase(const ConnectionId& id, Lock&) {
-    QPID_LOG(info, *this << " connection closed " << id);
     connections.erase(id);
     decoder.erase(id);
 }
@@ -1024,7 +1023,7 @@ void Cluster::memberUpdate(Lock& l) {
         ConnectionMap::iterator j = i++;
         MemberId m = j->second->getId().getMember();
         if (m != self && !map.isMember(m)) {
-            j->second->getBrokerConnection().closed();
+            j->second->close();
             erase(j->second->getId(), l);
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jun 16 20:32:04 2010
@@ -101,19 +101,18 @@ Connection::Connection(Cluster& c, sys::
     if (isLocalClient()) {
         // Local clients are announced to the cluster
         // and initialized when the announce is received.
-        QPID_LOG(info, "new client connection " << *this);
         giveReadCredit(cluster.getSettings().readMax); // Flow control
         init();
     }
     else {
         // Catch-up shadow connections initialized using nextShadow id.
         assert(catchUp);
-        QPID_LOG(info, "new catch-up connection " << *this);
-        connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
+        if (!updateIn.nextShadowMgmtId.empty())
+            connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
         updateIn.nextShadowMgmtId.clear();
         init();
     }
-
+    QPID_LOG(info, "incoming connection " << *this);
 }
 
 void Connection::setSecureConnection(broker::SecureConnection* sc) {
@@ -123,8 +122,6 @@ void Connection::setSecureConnection(bro
 
 void Connection::init() {
     connection = connectionCtor.construct();
-    QPID_LOG(debug, cluster << " initialized connection: " << *this
-             << " ssf=" << connection->getExternalSecuritySettings().ssf);
     if (isLocalClient()) {
         if (secureConnection) connection->setSecureConnection(secureConnection);
         // Actively send cluster-order frames from local node
@@ -171,7 +168,6 @@ void Connection::announce(
 
 Connection::~Connection() {
     if (connection.get()) connection->setErrorListener(0);
-    QPID_LOG(debug, cluster << " deleted connection: " << *this);
 }
 
 bool Connection::doOutput() {
@@ -250,16 +246,15 @@ void Connection::deliveredFrame(const Ev
 // A local connection is closed by the network layer.
 void Connection::closed() {
     try {
-        if (catchUp) {
+        if (isUpdated()) {
+            QPID_LOG(debug, cluster << " update connection closed " << *this);
+            close();
+        }
+        else if (catchUp) {
             QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
             cluster.leave();
         }
-        else if (isUpdated()) {
-            QPID_LOG(debug, cluster << " closed update connection " << *this);
-            if (connection.get()) connection->closed();
-        }
         else if (isLocal()) {
-            QPID_LOG(debug, cluster << " local close of replicated connection " << *this);
             // This was a local replicated connection. Multicast a deliver
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
@@ -275,15 +270,20 @@ void Connection::closed() {
 // Self-delivery of close message, close the connection.
 void Connection::deliverClose () {
     assert(!catchUp);
+    close();
+    cluster.erase(self);
+}
+
+// Close the connection 
+void Connection::close() {
     if (connection.get()) {
         connection->closed();
         // Ensure we delete the broker::Connection in the deliver thread.
         connection.reset();
     }
-    cluster.erase(self);
 }
 
-// The connection has been killed for misbehaving
+// The connection has been killed for misbehaving, called in connection thread.
 void Connection::abort() {
     if (connection.get()) {
         connection->abort();
@@ -424,7 +424,7 @@ void Connection::shadowReady(
     uint64_t memberId, uint64_t connectionId, const string& mgmtId,
     const string& username, const string& fragment, uint32_t sendMax)
 {
-    QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
+    QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId());
     ConnectionId shadowId = ConnectionId(memberId, connectionId);
     QPID_LOG(debug, cluster << " catch-up connection " << *this
              << " becomes shadow " << shadowId);
@@ -442,13 +442,19 @@ void Connection::membership(const FieldT
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
     cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
     updateIn.consumerNumbering.clear();
-    self.second = 0;        // Mark this as completed update connection.
+    closeUpdated();
 }
 
 void Connection::retractOffer() {
     QPID_LOG(info, cluster << " incoming update retracted on connection " << *this);
     cluster.updateInRetracted();
-    self.second = 0;        // Mark this as completed update connection.
+    closeUpdated();
+}
+
+void Connection::closeUpdated() {
+    self.second = 0;      // Mark this as completed update connection.
+    if (connection.get()) 
+        connection->close(connection::CLOSE_CODE_NORMAL, "OK");
 }
 
 bool Connection::isLocal() const {
@@ -527,7 +533,10 @@ std::ostream& operator<<(std::ostream& o
     if (c.isLocal()) type = "local";
     else if (c.isShadow()) type = "shadow";
     else if (c.isUpdated()) type = "updated";
-    return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
+    const broker::Connection* bc = c.getBrokerConnection();
+    if (bc) o << bc->getMgmtId();
+    else o << "<disconnected>";
+    return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")";
 }
 
 void Connection::txStart() {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jun 16 20:32:04 2010
@@ -75,7 +75,8 @@ class Connection :
     ~Connection();
     
     ConnectionId getId() const { return self; }
-    broker::Connection& getBrokerConnection() { return *connection; }
+    broker::Connection* getBrokerConnection() { return connection.get(); }
+    const broker::Connection* getBrokerConnection() const { return connection.get(); }
 
     /** Local connections may be clients or catch-up connections */
     bool isLocal() const;
@@ -167,6 +168,7 @@ class Connection :
     void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid,
                   bool nodict, const std::string& username,
                   const std::string& initFrames);
+    void close();
     void abort();
     void deliverClose();
 
@@ -227,6 +229,7 @@ class Connection :
     broker::SessionState& sessionState();
     broker::SemanticState& semanticState();
     broker::QueuedMessage getUpdateMessage();
+    void closeUpdated();
 
     Cluster& cluster;
     ConnectionId self;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Wed Jun 16 20:32:04 2010
@@ -83,7 +83,7 @@ void OutputInterceptor::deliverDoOutput(
             newLimit = (sendMax + sent) / 2;
     }
     sent = 0;
-    while (sent < limit && parent.getBrokerConnection().doOutput())
+    while (sent < limit && parent.getBrokerConnection()->doOutput())
         ++sent;
     if (sent == limit) sendDoOutput(newLimit);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Jun 16 20:32:04 2010
@@ -150,7 +150,8 @@ void UpdateClient::update() {
     // longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
-    std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+    std::for_each(connections.begin(), connections.end(),
+                  boost::bind(&UpdateClient::updateConnection, this, _1));
     session.queueDelete(arg::queue=UPDATE);
     session.close();
 
@@ -167,15 +168,18 @@ void UpdateClient::update() {
     client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
 
-    connection.close();
-    QPID_LOG(debug,  updaterId << " update completed to " << updateeId
-             << " at " << updateeUrl << ": " << membership);
+    // FIXME aconway 2010-06-16: Connection will be closed from the other end.
+    // connection.close();
+    
     // FIXME aconway 2010-03-15: This sleep avoids the race condition
     // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
     // It allows the connection to fully close before destroying the
     // Connection object. Remove when the bug is fixed.
     //
-    sys::usleep(10*1000);       // 100ms
+    sys::usleep(10*1000);
+
+    QPID_LOG(debug,  updaterId << " update completed to " << updateeId
+             << " at " << updateeUrl << ": " << membership);
 }
 
 namespace {
@@ -347,9 +351,11 @@ void UpdateClient::updateOutputTask(cons
 
 void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
     QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
-
+    assert(updateConnection->getBrokerConnection());
+    broker::Connection& bc = *updateConnection->getBrokerConnection();
+    
     // Send the management ID first on the main connection.
-    std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId();
+    std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
     ClusterConnectionProxy(session).shadowPrepare(mgmtId);
     // Make sure its received before opening shadow connection
     session.sync();
@@ -357,7 +363,6 @@ void UpdateClient::updateConnection(cons
     // Open shadow connection and update it.
     shadowConnection = catchUpConnection();
 
-    broker::Connection& bc = updateConnection->getBrokerConnection();
     connectionSettings.maxFrameSize = bc.getFrameMax();
     shadowConnection.open(updateeUrl, connectionSettings);
     bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
@@ -381,8 +386,7 @@ void UpdateClient::updateSession(broker:
     broker::SessionState* ss = sh.getSession();
     if (!ss) return;            // no session.
 
-    QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection()
-             << "[" << sh.getChannel() << "] = " << ss->getId());
+    QPID_LOG(debug, updaterId << " updating session " << ss->getId());
 
     // Create a client session to update session state. 
     boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);

Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Wed Jun 16 20:32:04 2010
@@ -673,7 +673,7 @@ main ( int argc, char const ** argv )
      // Get prefix for each queue name.
      stringstream queue_prefix;
      queue_prefix << "failover_soak_" << getpid();
-
+     string queue_prefix_str(queue_prefix.str());
 
      // Run the declareQueues child.
      int childStatus;
@@ -683,7 +683,7 @@ main ( int argc, char const ** argv )
                               declareQueuesPath, 
                               verbosity, 
                               durable,
-                              queue_prefix.str().c_str(),
+                              queue_prefix_str.c_str(),
                               n_queues
                             );
      if ( -1 == dqClientPid ) {
@@ -707,6 +707,7 @@ main ( int argc, char const ** argv )
 
          stringstream queue_name;
          queue_name << queue_prefix.str() << '_' << i;
+         string queue_name_str(queue_name.str());
 
          // Receiving client ---------------------------
          pid_t receivingClientPid =
@@ -715,7 +716,7 @@ main ( int argc, char const ** argv )
                                   receiverPath,
                                   reportFrequency,
                                   verbosity,
-                                  queue_name.str().c_str() );
+                                  queue_name_str.c_str() );
          if ( -1 == receivingClientPid ) {
              cerr << "END_OF_TEST ERROR_START_RECEIVER\n";
              return CANT_FORK_RECEIVER;
@@ -731,7 +732,7 @@ main ( int argc, char const ** argv )
                                 reportFrequency,
                                 verbosity,
                                 durable,
-                                queue_name.str().c_str() );
+                                queue_name_str.c_str() );
          if ( -1 == sendingClientPid ) {
              cerr << "END_OF_TEST ERROR_START_SENDER\n";
              return CANT_FORK_SENDER;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org