You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/16 00:40:31 UTC
svn commit: r686409 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h
qpid/cluster/ConnectionInterceptor.cpp qpid/cluster/ConnectionInterceptor.h
tests/ais_check
Author: aconway
Date: Fri Aug 15 15:40:30 2008
New Revision: 686409
URL: http://svn.apache.org/viewvc?rev=686409&view=rev
Log:
Fix memory leak in Cluster and enable valgrind in ais_check
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Aug 15 15:40:30 2008
@@ -87,13 +87,21 @@
mcastQueue.start(poller);
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+ for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin();
+ i != shadowConnectionMap.end();
+ ++i)
+ {
+ i->second->dirtyClose();
+ }
+ std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
+}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
bool isLocal = &c.getOutput() != &shadowOut;
if (isLocal)
- new ConnectionInterceptor(c, *this);
+ localConnectionSet.insert(new ConnectionInterceptor(c, *this));
}
void Cluster::leave() {
@@ -260,6 +268,8 @@
case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
if (!connection->isLocal())
shadowConnectionMap.erase(connection->getShadowId());
+ else
+ localConnectionSet.erase(connection);
connection->deliverClosed();
break;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Aug 15 15:40:30 2008
@@ -91,11 +91,12 @@
// Cluster frame handing functions
void notify(const std::string& url);
void connectionClose();
-
+
private:
typedef Cpg::Id Id;
typedef std::map<Id, Member> MemberMap;
typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
+ typedef std::set<ConnectionInterceptor*> LocalConnectionSet;
/** Message sent over the cluster. */
struct Message {
@@ -154,6 +155,7 @@
MemberMap members;
Id self;
ShadowConnectionMap shadowConnectionMap;
+ LocalConnectionSet localConnectionSet;
ShadowConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
MessageQueue deliverQueue;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp Fri Aug 15 15:40:30 2008
@@ -42,7 +42,6 @@
}
ConnectionInterceptor::~ConnectionInterceptor() {
- assert(isClosed);
assert(connection == 0);
}
@@ -52,7 +51,6 @@
}
void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
- // ostringstream os; os << f; printf("Received: %s\n", os.str().c_str()); // FIXME aconway 2008-08-08: remove
receivedNext(f);
}
@@ -82,9 +80,19 @@
connection = 0;
}
+void ConnectionInterceptor::dirtyClose() {
+ // Not closed via cluster self-delivery but closed locally.
+ // Used for dirty cluster shutdown where active connections
+ // must be cleaned up.
+ connection = 0;
+}
+
bool ConnectionInterceptor::doOutput() {
+ // FIXME aconway 2008-08-15: this is not correct.
+ // Run in write threads so order of execution of doOutput is not determinate.
+ // Will only work reliably for in single-consumer tests.
+
if (connection->hasOutput()) {
- QPID_LOG(debug, "Intercept doOutput, call doOutputNext"); // FIXME aconway 2008-08-08: remove
cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
return doOutputNext();
}
@@ -92,13 +100,9 @@
}
void ConnectionInterceptor::deliverDoOutput() {
- if (isShadow()) {
- QPID_LOG(debug, "Shadow deliver do output, call doOutputNext"); // FIXME aconway 2008-08-08: remove
+ // FIXME aconway 2008-08-15: see comment in doOutput.
+ if (isShadow())
doOutputNext();
- }
- else {
- QPID_LOG(debug, "Primary deliver doOutput, ignore."); // FIXME aconway 2008-08-08: remove
- }
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h Fri Aug 15 15:40:30 2008
@@ -48,6 +48,8 @@
void deliverClosed();
void deliverDoOutput();
+ void dirtyClose();
+
private:
struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
void close() {}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=686409&r1=686408&r2=686409&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Fri Aug 15 15:40:30 2008
@@ -1,4 +1,5 @@
#!/bin/sh
+srcdir=`dirname $0`
# Check AIS requirements tests if found.
id -nG | grep '\<ais\>' >/dev/null || \
@@ -31,6 +32,6 @@
# Run the tests
srcdir=`dirname $0`
-with_ais_group ./cluster_test || ERROR=1
+with_ais_group $srcdir/run_test ./cluster_test || ERROR=1
exit $ERROR