You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/03/30 00:30:50 UTC
svn commit: r523857 - in /incubator/qpid/trunk/qpid/cpp: lib/broker/ tests/
Author: aconway
Date: Thu Mar 29 15:30:48 2007
New Revision: 523857
URL: http://svn.apache.org/viewvc?view=rev&rev=523857
Log:
Fixed memory leak: removed Binding and ExchangeBinding.
These classes unbind a deleted queue from any Exchanges.
But Exchanges hold shared_ptr<Queue>, so queues never deleted while
the exchange exists. Moreover queue-binding form a shared_ptr cycle
causing a leak.
Raised QPID-438 for the remaining problem: destroyed queues are never
unbound or deleted
Removed:
incubator/qpid/trunk/qpid/cpp/lib/broker/Binding.h
incubator/qpid/trunk/qpid/cpp/lib/broker/ExchangeBinding.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/ExchangeBinding.h
Modified:
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/lib/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=523857&r1=523856&r2=523857
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Thu Mar 29 15:30:48 2007
@@ -50,16 +50,7 @@
if(autodelete) lastUsed = now()/TIME_MSEC;
}
-Queue::~Queue(){
- for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
- b->cancel();
- bindings.pop();
- }
-}
-
-void Queue::bound(Binding* b){
- bindings.push(b);
-}
+Queue::~Queue(){}
void Queue::deliver(Message::shared_ptr& msg){
enqueue(0, msg, 0);
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h?view=diff&rev=523857&r1=523856&r2=523857
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h Thu Mar 29 15:30:48 2007
@@ -26,7 +26,6 @@
#include <queue>
#include <boost/shared_ptr.hpp>
#include <amqp_types.h>
-#include <Binding.h>
#include <ConnectionToken.h>
#include <Consumer.h>
#include <BrokerMessage.h>
@@ -54,7 +53,6 @@
*/
class Queue{
typedef std::vector<Consumer*> Consumers;
- typedef std::queue<Binding*> Bindings;
typedef std::queue<Message::shared_ptr> Messages;
const string name;
@@ -62,7 +60,6 @@
MessageStore* const store;
const ConnectionToken* const owner;
Consumers consumers;
- Bindings bindings;
Messages messages;
bool queueing;
bool dispatching;
@@ -93,11 +90,6 @@
void create(const qpid::framing::FieldTable& settings);
void configure(const qpid::framing::FieldTable& settings);
void destroy();
- /**
- * Informs the queue of a binding that should be cancelled on
- * destruction of the queue.
- */
- void bound(Binding* b);
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/DirectExchange.cpp?view=diff&rev=523857&r1=523856&r2=523857
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/DirectExchange.cpp Thu Mar 29 15:30:48 2007
@@ -19,7 +19,6 @@
*
*/
#include <DirectExchange.h>
-#include <ExchangeBinding.h>
#include <iostream>
using namespace qpid::broker;
@@ -30,13 +29,12 @@
}
-void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
if(i == queues.end()){
bindings[routingKey].push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/FanOutExchange.cpp?view=diff&rev=523857&r1=523856&r2=523857
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/FanOutExchange.cpp Thu Mar 29 15:30:48 2007
@@ -19,7 +19,6 @@
*
*/
#include <FanOutExchange.h>
-#include <ExchangeBinding.h>
#include <algorithm>
using namespace qpid::broker;
@@ -28,13 +27,12 @@
FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+void FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
Mutex::ScopedLock locker(lock);
// Add if not already present.
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i == bindings.end()) {
bindings.push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
}
}
@@ -43,8 +41,6 @@
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i != bindings.end()) {
bindings.erase(i);
- // TODO aconway 2006-09-14: What about the ExchangeBinding object?
- // Don't we have to verify routingKey/args match?
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/HeadersExchange.cpp?view=diff&rev=523857&r1=523856&r2=523857
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/HeadersExchange.cpp Thu Mar 29 15:30:48 2007
@@ -19,7 +19,6 @@
*
*/
#include <HeadersExchange.h>
-#include <ExchangeBinding.h>
#include <Value.h>
#include <QpidError.h>
#include <algorithm>
@@ -43,14 +42,13 @@
HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+void HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
Mutex::ScopedLock locker(lock);
std::string what = args->getString("x-match");
if (what != all && what != any) {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
}
bindings.push_back(Binding(*args, queue));
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
}
void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am?view=diff&rev=523857&r1=523856&r2=523857
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am Thu Mar 29 15:30:48 2007
@@ -42,8 +42,6 @@
DeliveryRecord.h \
DirectExchange.cpp \
DirectExchange.h \
- ExchangeBinding.cpp \
- ExchangeBinding.h \
ExchangeRegistry.cpp \
ExchangeRegistry.h \
FanOutExchange.cpp \
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp?view=diff&rev=523857&r1=523856&r2=523857
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp Thu Mar 29 15:30:48 2007
@@ -19,7 +19,6 @@
*
*/
#include <TopicExchange.h>
-#include <ExchangeBinding.h>
#include <algorithm>
using namespace qpid::broker;
@@ -118,11 +117,10 @@
TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
TopicPattern routingPattern(routingKey);
bindings[routingPattern].push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
}
void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Modified: incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp?view=diff&rev=523857&r1=523856&r2=523857
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp Thu Mar 29 15:30:48 2007
@@ -28,15 +28,6 @@
using namespace qpid::sys;
-class TestBinding : public virtual Binding{
- bool cancelled;
-
-public:
- TestBinding();
- virtual void cancel();
- bool isCancelled();
-};
-
class TestConsumer : public virtual Consumer{
public:
Message::shared_ptr last;
@@ -49,7 +40,6 @@
{
CPPUNIT_TEST_SUITE(QueueTest);
CPPUNIT_TEST(testConsumers);
- CPPUNIT_TEST(testBinding);
CPPUNIT_TEST(testRegistry);
CPPUNIT_TEST(testDequeue);
CPPUNIT_TEST_SUITE_END();
@@ -93,20 +83,6 @@
CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getConsumerCount());
}
- void testBinding(){
- Queue::shared_ptr queue(new Queue("my_queue", true));
- //Test bindings:
- TestBinding a;
- TestBinding b;
- queue->bound(&a);
- queue->bound(&b);
-
- queue.reset();
-
- CPPUNIT_ASSERT(a.isCancelled());
- CPPUNIT_ASSERT(b.isCancelled());
- }
-
void testRegistry(){
//Test use of queues in registry:
QueueRegistry registry;
@@ -164,18 +140,6 @@
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
-
-//TestBinding
-TestBinding::TestBinding() : cancelled(false) {}
-
-void TestBinding::cancel(){
- CPPUNIT_ASSERT(!cancelled);
- cancelled = true;
-}
-
-bool TestBinding::isCancelled(){
- return cancelled;
-}
//TestConsumer
bool TestConsumer::deliver(Message::shared_ptr& msg){