You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/16 00:40:31 UTC

svn commit: r686409 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/ConnectionInterceptor.cpp qpid/cluster/ConnectionInterceptor.h tests/ais_check

Author: aconway
Date: Fri Aug 15 15:40:30 2008
New Revision: 686409

URL: http://svn.apache.org/viewvc?rev=686409&view=rev
Log:
Fix memory leak in Cluster and enable valgrind in ais_check

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_check

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Aug 15 15:40:30 2008
@@ -87,13 +87,21 @@
     mcastQueue.start(poller);
 }
 
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+    for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin();
+         i != shadowConnectionMap.end();
+         ++i)
+    {
+        i->second->dirtyClose(); 
+    }
+    std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
+}
 
 // local connection initializes plugins
 void Cluster::initialize(broker::Connection& c) {
     bool isLocal = &c.getOutput() != &shadowOut;
     if (isLocal)
-        new ConnectionInterceptor(c, *this);
+        localConnectionSet.insert(new ConnectionInterceptor(c, *this));
 }
 
 void Cluster::leave() {
@@ -260,6 +268,8 @@
       case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
           if (!connection->isLocal())
               shadowConnectionMap.erase(connection->getShadowId());
+          else
+              localConnectionSet.erase(connection);
           connection->deliverClosed();
           break;
       }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Aug 15 15:40:30 2008
@@ -91,11 +91,12 @@
     // Cluster frame handing functions
     void notify(const std::string& url);
     void connectionClose();
-    
+
   private:
     typedef Cpg::Id Id;
     typedef std::map<Id, Member>  MemberMap;
     typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
+    typedef std::set<ConnectionInterceptor*> LocalConnectionSet;
 
     /** Message sent over the cluster. */
     struct Message {
@@ -154,6 +155,7 @@
     MemberMap members;
     Id self;
     ShadowConnectionMap shadowConnectionMap;
+    LocalConnectionSet localConnectionSet;
     ShadowConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
     MessageQueue deliverQueue;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp Fri Aug 15 15:40:30 2008
@@ -42,7 +42,6 @@
 }
 
 ConnectionInterceptor::~ConnectionInterceptor() {
-    assert(isClosed);
     assert(connection == 0);
 }
 
@@ -52,7 +51,6 @@
 }
 
 void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
-    //    ostringstream os; os << f; printf("Received: %s\n", os.str().c_str()); // FIXME aconway 2008-08-08: remove
     receivedNext(f);
 }
 
@@ -82,9 +80,19 @@
     connection = 0;             
 }
 
+void ConnectionInterceptor::dirtyClose() {
+    // Not closed via cluster self-delivery but closed locally.
+    // Used for dirty cluster shutdown where active connections
+    // must be cleaned up.
+    connection = 0;
+}
+
 bool  ConnectionInterceptor::doOutput() {
+    // FIXME aconway 2008-08-15: this is not correct.
+    // Run in write threads so order of execution of doOutput is not determinate.
+    // Will only work reliably for in single-consumer tests.   
+
     if (connection->hasOutput()) {
-        QPID_LOG(debug, "Intercept doOutput, call doOutputNext"); // FIXME aconway 2008-08-08: remove
         cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
         return doOutputNext();
     }
@@ -92,13 +100,9 @@
 }
 
 void ConnectionInterceptor::deliverDoOutput() {
-    if (isShadow()) {
-        QPID_LOG(debug, "Shadow deliver do output, call doOutputNext"); // FIXME aconway 2008-08-08: remove
+    // FIXME aconway 2008-08-15: see comment in doOutput.
+    if (isShadow()) 
         doOutputNext();
-    }
-    else {
-        QPID_LOG(debug, "Primary deliver doOutput, ignore."); // FIXME aconway 2008-08-08: remove
-    }
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h Fri Aug 15 15:40:30 2008
@@ -48,6 +48,8 @@
     void deliverClosed();
     void deliverDoOutput();
 
+    void dirtyClose();
+
   private:
     struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
         void close() {}

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Fri Aug 15 15:40:30 2008
@@ -1,4 +1,5 @@
 #!/bin/sh
+srcdir=`dirname $0`
 
 # Check AIS requirements tests if found.
 id -nG | grep '\<ais\>' >/dev/null || \
@@ -31,6 +32,6 @@
 
 # Run the tests
 srcdir=`dirname $0`
-with_ais_group ./cluster_test || ERROR=1
+with_ais_group $srcdir/run_test ./cluster_test || ERROR=1
 exit $ERROR