You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/03 20:50:45 UTC

svn commit: r525244 - in /incubator/qpid/trunk/qpid/cpp/src: broker/Broker.cpp broker/BrokerMessageMessage.cpp broker/BrokerMessageMessage.h tests/ReferenceTest.cpp

Author: aconway
Date: Tue Apr  3 11:50:43 2007
New Revision: 525244

URL: http://svn.apache.org/viewvc?view=rev&rev=525244
Log:

* cpp/src/broker/Broker.cpp: Join cleaner thread.

* cpp/src/broker/BrokerMessageMessage.h, .cpp, ReferenceTest: 
 - Broke reference cycle between broker::MessageMessage and Reference
   by using a weak_ptr in MessageMessage

Modified:
    incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp?view=diff&rev=525244&r1=525243&r2=525244
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp Tue Apr  3 11:50:43 2007
@@ -95,6 +95,7 @@
 void Broker::shutdown() {
     if (acceptor)
         acceptor->shutdown();
+    cleaner.stop();
 }
 
 Broker::~Broker() {

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp?view=diff&rev=525244&r1=525243&r2=525244
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp Tue Apr  3 11:50:43 2007
@@ -48,7 +48,9 @@
             transfer_),
     requestId(requestId_),
     transfer(transfer_)
-{}
+{
+    assert(transfer->getBody().isInline());
+}
 
 MessageMessage::MessageMessage(
     ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_,
@@ -61,7 +63,10 @@
     requestId(requestId_),
     transfer(transfer_),
     reference(reference_)
-{}
+{
+    assert(!transfer->getBody().isInline());
+    assert(reference_);
+}
 
 /**
  * Currently used by message store impls to recover messages 
@@ -74,101 +79,100 @@
     const std::string& consumerTag, 
     uint32_t framesize)
 {	
-	const framing::Content& body = transfer->getBody();
-
-	// Send any reference data
-	if (!body.isInline()){
-		// Open
-		channel.send(new MessageOpenBody(channel.getVersion(), reference->getId()));
-		// Appends
-		for(Reference::Appends::const_iterator a = reference->getAppends().begin();
-			a != reference->getAppends().end();
-			++a) {
-			uint32_t sizeleft = (*a)->size();
-			const string& content = (*a)->getBytes();
-			// Calculate overhead bytes
-			// Assume that the overhead is constant as the reference name doesn't change
-			uint32_t overhead = sizeleft - content.size();
-			string::size_type contentStart = 0;
-			while (sizeleft) {
-				string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
-				channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(),
-						string(content, contentStart, contentSize)));
-					sizeleft -= contentSize;
-					contentStart += contentSize;
-			}
-		}
-	}
+    const framing::Content& body = transfer->getBody();
+    // Send any reference data
+    ReferencePtr ref= getReference();
+    if (ref){
+
+        // Open
+        channel.send(new MessageOpenBody(channel.getVersion(), ref->getId()));
+        // Appends
+        for(Reference::Appends::const_iterator a = ref->getAppends().begin();
+            a != ref->getAppends().end();
+            ++a) {
+            uint32_t sizeleft = (*a)->size();
+            const string& content = (*a)->getBytes();
+            // Calculate overhead bytes
+            // Assume that the overhead is constant as the reference name doesn't change
+            uint32_t overhead = sizeleft - content.size();
+            string::size_type contentStart = 0;
+            while (sizeleft) {
+                string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
+                channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(),
+                                                   string(content, contentStart, contentSize)));
+                sizeleft -= contentSize;
+                contentStart += contentSize;
+            }
+        }
+    }
 	
-	// The transfer
-	if ( transfer->size()<=framesize ) {
+    // The transfer
+    if ( transfer->size()<=framesize ) {
     	channel.send(
-    	new MessageTransferBody(channel.getVersion(), 
-                                transfer->getTicket(),
-                                consumerTag,
-                                getRedelivered(),
-                                transfer->getImmediate(),
-                                transfer->getTtl(),
-                                transfer->getPriority(),
-                                transfer->getTimestamp(),
-                                transfer->getDeliveryMode(),
-                                transfer->getExpiration(),
-                                getExchange(),
-                                getRoutingKey(),
-                                transfer->getMessageId(),
-                                transfer->getCorrelationId(),
-                                transfer->getReplyTo(),
-                                transfer->getContentType(),
-                                transfer->getContentEncoding(),
-                                transfer->getUserId(),
-                                transfer->getAppId(),
-                                transfer->getTransactionId(),
-                                transfer->getSecurityToken(),
-                                transfer->getApplicationHeaders(),
-                                body,
-                                transfer->getMandatory()));
-	} else {
-		// Thing to do here is to construct a simple reference message then deliver that instead
-		// fragmentation will be taken care of in the delivery if necessary;
-		string content = body.getValue();
-		string refname = "dummy";
-		TransferPtr newTransfer(
-	    	new MessageTransferBody(channel.getVersion(), 
-	                                transfer->getTicket(),
-	                                consumerTag,
-	                                getRedelivered(),
-	                                transfer->getImmediate(),
-	                                transfer->getTtl(),
-	                                transfer->getPriority(),
-	                                transfer->getTimestamp(),
-	                                transfer->getDeliveryMode(),
-	                                transfer->getExpiration(),
-	                                getExchange(),
-	                                getRoutingKey(),
-	                                transfer->getMessageId(),
-	                                transfer->getCorrelationId(),
-	                                transfer->getReplyTo(),
-	                                transfer->getContentType(),
-	                                transfer->getContentEncoding(),
-	                                transfer->getUserId(),
-	                                transfer->getAppId(),
-	                                transfer->getTransactionId(),
-	                                transfer->getSecurityToken(),
-	                                transfer->getApplicationHeaders(),
-	                                framing::Content(REFERENCE, refname),
-	                                transfer->getMandatory()));
-		ReferencePtr newRef(new Reference(refname));
-		Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
-		newRef->append(newAppend);
-		MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
-		newMsg.transferMessage(channel, consumerTag, framesize);
-		return;
-	}
-	// Close any reference data
-	if (!body.isInline()){
-		// Close
-		channel.send(new MessageCloseBody(channel.getVersion(), reference->getId()));
-	}
+            new MessageTransferBody(channel.getVersion(), 
+                                    transfer->getTicket(),
+                                    consumerTag,
+                                    getRedelivered(),
+                                    transfer->getImmediate(),
+                                    transfer->getTtl(),
+                                    transfer->getPriority(),
+                                    transfer->getTimestamp(),
+                                    transfer->getDeliveryMode(),
+                                    transfer->getExpiration(),
+                                    getExchange(),
+                                    getRoutingKey(),
+                                    transfer->getMessageId(),
+                                    transfer->getCorrelationId(),
+                                    transfer->getReplyTo(),
+                                    transfer->getContentType(),
+                                    transfer->getContentEncoding(),
+                                    transfer->getUserId(),
+                                    transfer->getAppId(),
+                                    transfer->getTransactionId(),
+                                    transfer->getSecurityToken(),
+                                    transfer->getApplicationHeaders(),
+                                    body,
+                                    transfer->getMandatory()));
+    } else {
+        // Thing to do here is to construct a simple reference message then deliver that instead
+        // fragmentation will be taken care of in the delivery if necessary;
+        string content = body.getValue();
+        string refname = "dummy";
+        TransferPtr newTransfer(
+            new MessageTransferBody(channel.getVersion(), 
+                                    transfer->getTicket(),
+                                    consumerTag,
+                                    getRedelivered(),
+                                    transfer->getImmediate(),
+                                    transfer->getTtl(),
+                                    transfer->getPriority(),
+                                    transfer->getTimestamp(),
+                                    transfer->getDeliveryMode(),
+                                    transfer->getExpiration(),
+                                    getExchange(),
+                                    getRoutingKey(),
+                                    transfer->getMessageId(),
+                                    transfer->getCorrelationId(),
+                                    transfer->getReplyTo(),
+                                    transfer->getContentType(),
+                                    transfer->getContentEncoding(),
+                                    transfer->getUserId(),
+                                    transfer->getAppId(),
+                                    transfer->getTransactionId(),
+                                    transfer->getSecurityToken(),
+                                    transfer->getApplicationHeaders(),
+                                    framing::Content(REFERENCE, refname),
+                                    transfer->getMandatory()));
+        ReferencePtr newRef(new Reference(refname));
+        Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
+        newRef->append(newAppend);
+        MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+        newMsg.transferMessage(channel, consumerTag, framesize);
+        return;
+    }
+    // Close any reference data
+    if (ref)
+        channel.send(new MessageCloseBody(channel.getVersion(), ref->getId()));
 }
 
 void MessageMessage::deliver(
@@ -177,18 +181,18 @@
     uint64_t /*deliveryTag*/, 
     uint32_t framesize)
 {
-	transferMessage(channel, consumerTag, framesize);
+    transferMessage(channel, consumerTag, framesize);
 }
 
 void MessageMessage::sendGetOk(
     const framing::MethodContext& context,
-	const std::string& destination,
+    const std::string& destination,
     uint32_t /*messageCount*/,
     uint64_t /*deliveryTag*/, 
     uint32_t framesize)
 {
-	framing::ChannelAdapter* channel = context.channel;
-	transferMessage(*channel, destination, framesize);
+    framing::ChannelAdapter* channel = context.channel;
+    transferMessage(*channel, destination, framesize);
 }
 
 bool MessageMessage::isComplete()
@@ -198,10 +202,12 @@
 
 uint64_t MessageMessage::contentSize() const
 {
-	if (transfer->getBody().isInline())
-	    return transfer->getBody().getValue().size();
-	else
-    	return reference->getSize();		 
+    if (transfer->getBody().isInline())
+        return transfer->getBody().getValue().size();
+    else {
+        assert(getReference());
+    	return getReference()->getSize();
+    }
 }
 
 qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
@@ -249,8 +255,10 @@
     if (transfer->getBody().isInline()) {
         transfer->encodeContent(buffer);
     } else {
+        assert(getReference());
         string data;
-        for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) {
+        const Reference::Appends& appends = getReference()->getAppends();
+        for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
             data += (*a)->getBytes();
         }
         framing::Content body(INLINE, data);
@@ -302,4 +310,11 @@
                                    transfer->getMandatory());
 
 }
+
+MessageMessage::ReferencePtr MessageMessage::getReference() const {
+    return reference.lock();
+}
+
+
 }} // namespace qpid::broker
+

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h?view=diff&rev=525244&r1=525243&r2=525244
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h Tue Apr  3 11:50:43 2007
@@ -24,7 +24,7 @@
 #include "BrokerMessageBase.h"
 #include "MessageTransferBody.h"
 #include "../framing/amqp_types.h"
-
+#include <boost/weak_ptr.hpp>
 #include <vector>
 
 namespace qpid {
@@ -42,16 +42,16 @@
     typedef boost::shared_ptr<MessageMessage> shared_ptr;
     typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
     typedef boost::shared_ptr<Reference> ReferencePtr;
-
+    
     MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer);
     MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference);
     MessageMessage();
             
     // Default destructor okay
 
-    framing::RequestId getRequestId() {return requestId; }
-    TransferPtr getTransfer() { return transfer; }
-    ReferencePtr getReference() { return reference; }
+    framing::RequestId getRequestId() const {return requestId; }
+    TransferPtr getTransfer() const { return transfer; }
+    ReferencePtr getReference() const ;
     
     void deliver(framing::ChannelAdapter& channel, 
                  const std::string& consumerTag, 
@@ -81,16 +81,19 @@
     void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
 
   private:
-    void transferMessage(framing::ChannelAdapter& channel, 
-                         const std::string& consumerTag, 
-                         uint32_t framesize);
-    framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version,
-                                               const std::string& destination, 
-                                               const framing::Content& body) const;
+    void transferMessage(
+        framing::ChannelAdapter& channel, 
+        const std::string& consumerTag, 
+        uint32_t framesize);
+    
+    framing::MessageTransferBody* copyTransfer(
+        const framing::ProtocolVersion& version,
+        const std::string& destination, 
+        const framing::Content& body) const;
   
     framing::RequestId requestId;
     const TransferPtr transfer;
-    const ReferencePtr reference;
+    const boost::weak_ptr<Reference> reference;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp?view=diff&rev=525244&r1=525243&r2=525244
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp Tue Apr  3 11:50:43 2007
@@ -28,6 +28,7 @@
 #include "../broker/CompletionHandler.h"
 
 using namespace boost;
+using namespace qpid;
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace std;
@@ -41,22 +42,8 @@
 
     ProtocolVersion v;
     ReferenceRegistry registry;
-    Reference::shared_ptr r1;
-    MessageTransferBody::shared_ptr t1, t2;
-    MessageMessage::shared_ptr m1, m2;
-    MessageAppendBody::shared_ptr a1, a2;
-  public:
-
-    ReferenceTest() :
-        r1(registry.open("bar")),
-        t1(new MessageTransferBody(v)),
-        t2(new MessageTransferBody(v)),
-        m1(new MessageMessage(0, 1, t1, r1)),
-        m2(new MessageMessage(0, 2, t2, r1)),
-        a1(new MessageAppendBody(v)),
-        a2(new MessageAppendBody(v))
-    {}
 
+  public:
     void testRegistry() {
         Reference::shared_ptr ref = registry.open("foo");
         CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId());
@@ -69,32 +56,43 @@
             registry.open("foo");
             CPPUNIT_FAIL("Expected exception");
         } catch(...) {}
+        ref->close();
+        try {
+            registry.get("foo");
+            CPPUNIT_FAIL("Expected exception");
+        } catch(...) {}
     }
 
     void testReference() {
+
+        Reference::shared_ptr r1(registry.open("bar"));
+
+        MessageTransferBody::shared_ptr t1(new MessageTransferBody(v));
+        // TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up.
+        const_cast<framing::Content&>(t1->getBody()) = framing::Content(REFERENCE,"bar");
+        MessageMessage::shared_ptr m1(new MessageMessage(0, 1, t1, r1));
+
+        MessageTransferBody::shared_ptr  t2(new MessageTransferBody(v));
+        const_cast<framing::Content&>(t2->getBody()) = framing::Content(REFERENCE,"bar");
+        MessageMessage::shared_ptr m2(new MessageMessage(0, 2, t2, r1));
+        
+        MessageAppendBody::shared_ptr a1(new MessageAppendBody(v));
+        MessageAppendBody::shared_ptr a2(new MessageAppendBody(v));
+
         r1->addMessage(m1);
         r1->addMessage(m2);
         CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size());
         r1->append(a1);
         r1->append(a2);
         CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size());
-        const vector<MessageMessage::shared_ptr> messages = r1->getMessages();
         r1->close();
-        try {
-            registry.open("bar");
-            CPPUNIT_FAIL("Expected exception");
-        } catch(...) {}
 
-        CPPUNIT_ASSERT_EQUAL(messages[0], m1);
-        CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[0], a1);
-        CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[1], a2);
-
-        CPPUNIT_ASSERT_EQUAL(messages[1], m2);
-        CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[0], a1);
-        CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[1], a2);
+        CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[0], a1);
+        CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[1], a2);
+
+        CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[0], a1);
+        CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[1], a2);
     }
-                             
-    
 };
 
 // Make this test suite a plugin.