You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/03 20:50:45 UTC
svn commit: r525244 - in /incubator/qpid/trunk/qpid/cpp/src:
broker/Broker.cpp broker/BrokerMessageMessage.cpp
broker/BrokerMessageMessage.h tests/ReferenceTest.cpp
Author: aconway
Date: Tue Apr 3 11:50:43 2007
New Revision: 525244
URL: http://svn.apache.org/viewvc?view=rev&rev=525244
Log:
* cpp/src/broker/Broker.cpp: Join cleaner thread.
* cpp/src/broker/BrokerMessageMessage.h, .cpp, ReferenceTest:
- Broke reference cycle between broker::MessageMessage and Reference
by using a weak_ptr in MessageMessage
Modified:
incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h
incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp?view=diff&rev=525244&r1=525243&r2=525244
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/Broker.cpp Tue Apr 3 11:50:43 2007
@@ -95,6 +95,7 @@
void Broker::shutdown() {
if (acceptor)
acceptor->shutdown();
+ cleaner.stop();
}
Broker::~Broker() {
Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp?view=diff&rev=525244&r1=525243&r2=525244
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp Tue Apr 3 11:50:43 2007
@@ -48,7 +48,9 @@
transfer_),
requestId(requestId_),
transfer(transfer_)
-{}
+{
+ assert(transfer->getBody().isInline());
+}
MessageMessage::MessageMessage(
ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_,
@@ -61,7 +63,10 @@
requestId(requestId_),
transfer(transfer_),
reference(reference_)
-{}
+{
+ assert(!transfer->getBody().isInline());
+ assert(reference_);
+}
/**
* Currently used by message store impls to recover messages
@@ -74,101 +79,100 @@
const std::string& consumerTag,
uint32_t framesize)
{
- const framing::Content& body = transfer->getBody();
-
- // Send any reference data
- if (!body.isInline()){
- // Open
- channel.send(new MessageOpenBody(channel.getVersion(), reference->getId()));
- // Appends
- for(Reference::Appends::const_iterator a = reference->getAppends().begin();
- a != reference->getAppends().end();
- ++a) {
- uint32_t sizeleft = (*a)->size();
- const string& content = (*a)->getBytes();
- // Calculate overhead bytes
- // Assume that the overhead is constant as the reference name doesn't change
- uint32_t overhead = sizeleft - content.size();
- string::size_type contentStart = 0;
- while (sizeleft) {
- string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(),
- string(content, contentStart, contentSize)));
- sizeleft -= contentSize;
- contentStart += contentSize;
- }
- }
- }
+ const framing::Content& body = transfer->getBody();
+ // Send any reference data
+ ReferencePtr ref= getReference();
+ if (ref){
+
+ // Open
+ channel.send(new MessageOpenBody(channel.getVersion(), ref->getId()));
+ // Appends
+ for(Reference::Appends::const_iterator a = ref->getAppends().begin();
+ a != ref->getAppends().end();
+ ++a) {
+ uint32_t sizeleft = (*a)->size();
+ const string& content = (*a)->getBytes();
+ // Calculate overhead bytes
+ // Assume that the overhead is constant as the reference name doesn't change
+ uint32_t overhead = sizeleft - content.size();
+ string::size_type contentStart = 0;
+ while (sizeleft) {
+ string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
+ channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize)));
+ sizeleft -= contentSize;
+ contentStart += contentSize;
+ }
+ }
+ }
- // The transfer
- if ( transfer->size()<=framesize ) {
+ // The transfer
+ if ( transfer->size()<=framesize ) {
channel.send(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body,
- transfer->getMandatory()));
- } else {
- // Thing to do here is to construct a simple reference message then deliver that instead
- // fragmentation will be taken care of in the delivery if necessary;
- string content = body.getValue();
- string refname = "dummy";
- TransferPtr newTransfer(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname),
- transfer->getMandatory()));
- ReferencePtr newRef(new Reference(refname));
- Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
- newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
- newMsg.transferMessage(channel, consumerTag, framesize);
- return;
- }
- // Close any reference data
- if (!body.isInline()){
- // Close
- channel.send(new MessageCloseBody(channel.getVersion(), reference->getId()));
- }
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ body,
+ transfer->getMandatory()));
+ } else {
+ // Thing to do here is to construct a simple reference message then deliver that instead
+ // fragmentation will be taken care of in the delivery if necessary;
+ string content = body.getValue();
+ string refname = "dummy";
+ TransferPtr newTransfer(
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ framing::Content(REFERENCE, refname),
+ transfer->getMandatory()));
+ ReferencePtr newRef(new Reference(refname));
+ Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
+ newRef->append(newAppend);
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+ newMsg.transferMessage(channel, consumerTag, framesize);
+ return;
+ }
+ // Close any reference data
+ if (ref)
+ channel.send(new MessageCloseBody(channel.getVersion(), ref->getId()));
}
void MessageMessage::deliver(
@@ -177,18 +181,18 @@
uint64_t /*deliveryTag*/,
uint32_t framesize)
{
- transferMessage(channel, consumerTag, framesize);
+ transferMessage(channel, consumerTag, framesize);
}
void MessageMessage::sendGetOk(
const framing::MethodContext& context,
- const std::string& destination,
+ const std::string& destination,
uint32_t /*messageCount*/,
uint64_t /*deliveryTag*/,
uint32_t framesize)
{
- framing::ChannelAdapter* channel = context.channel;
- transferMessage(*channel, destination, framesize);
+ framing::ChannelAdapter* channel = context.channel;
+ transferMessage(*channel, destination, framesize);
}
bool MessageMessage::isComplete()
@@ -198,10 +202,12 @@
uint64_t MessageMessage::contentSize() const
{
- if (transfer->getBody().isInline())
- return transfer->getBody().getValue().size();
- else
- return reference->getSize();
+ if (transfer->getBody().isInline())
+ return transfer->getBody().getValue().size();
+ else {
+ assert(getReference());
+ return getReference()->getSize();
+ }
}
qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
@@ -249,8 +255,10 @@
if (transfer->getBody().isInline()) {
transfer->encodeContent(buffer);
} else {
+ assert(getReference());
string data;
- for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) {
+ const Reference::Appends& appends = getReference()->getAppends();
+ for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
data += (*a)->getBytes();
}
framing::Content body(INLINE, data);
@@ -302,4 +310,11 @@
transfer->getMandatory());
}
+
+MessageMessage::ReferencePtr MessageMessage::getReference() const {
+ return reference.lock();
+}
+
+
}} // namespace qpid::broker
+
Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h?view=diff&rev=525244&r1=525243&r2=525244
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h Tue Apr 3 11:50:43 2007
@@ -24,7 +24,7 @@
#include "BrokerMessageBase.h"
#include "MessageTransferBody.h"
#include "../framing/amqp_types.h"
-
+#include <boost/weak_ptr.hpp>
#include <vector>
namespace qpid {
@@ -42,16 +42,16 @@
typedef boost::shared_ptr<MessageMessage> shared_ptr;
typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
typedef boost::shared_ptr<Reference> ReferencePtr;
-
+
MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer);
MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference);
MessageMessage();
// Default destructor okay
- framing::RequestId getRequestId() {return requestId; }
- TransferPtr getTransfer() { return transfer; }
- ReferencePtr getReference() { return reference; }
+ framing::RequestId getRequestId() const {return requestId; }
+ TransferPtr getTransfer() const { return transfer; }
+ ReferencePtr getReference() const ;
void deliver(framing::ChannelAdapter& channel,
const std::string& consumerTag,
@@ -81,16 +81,19 @@
void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
private:
- void transferMessage(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint32_t framesize);
- framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version,
- const std::string& destination,
- const framing::Content& body) const;
+ void transferMessage(
+ framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ uint32_t framesize);
+
+ framing::MessageTransferBody* copyTransfer(
+ const framing::ProtocolVersion& version,
+ const std::string& destination,
+ const framing::Content& body) const;
framing::RequestId requestId;
const TransferPtr transfer;
- const ReferencePtr reference;
+ const boost::weak_ptr<Reference> reference;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp?view=diff&rev=525244&r1=525243&r2=525244
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ReferenceTest.cpp Tue Apr 3 11:50:43 2007
@@ -28,6 +28,7 @@
#include "../broker/CompletionHandler.h"
using namespace boost;
+using namespace qpid;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace std;
@@ -41,22 +42,8 @@
ProtocolVersion v;
ReferenceRegistry registry;
- Reference::shared_ptr r1;
- MessageTransferBody::shared_ptr t1, t2;
- MessageMessage::shared_ptr m1, m2;
- MessageAppendBody::shared_ptr a1, a2;
- public:
-
- ReferenceTest() :
- r1(registry.open("bar")),
- t1(new MessageTransferBody(v)),
- t2(new MessageTransferBody(v)),
- m1(new MessageMessage(0, 1, t1, r1)),
- m2(new MessageMessage(0, 2, t2, r1)),
- a1(new MessageAppendBody(v)),
- a2(new MessageAppendBody(v))
- {}
+ public:
void testRegistry() {
Reference::shared_ptr ref = registry.open("foo");
CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId());
@@ -69,32 +56,43 @@
registry.open("foo");
CPPUNIT_FAIL("Expected exception");
} catch(...) {}
+ ref->close();
+ try {
+ registry.get("foo");
+ CPPUNIT_FAIL("Expected exception");
+ } catch(...) {}
}
void testReference() {
+
+ Reference::shared_ptr r1(registry.open("bar"));
+
+ MessageTransferBody::shared_ptr t1(new MessageTransferBody(v));
+ // TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up.
+ const_cast<framing::Content&>(t1->getBody()) = framing::Content(REFERENCE,"bar");
+ MessageMessage::shared_ptr m1(new MessageMessage(0, 1, t1, r1));
+
+ MessageTransferBody::shared_ptr t2(new MessageTransferBody(v));
+ const_cast<framing::Content&>(t2->getBody()) = framing::Content(REFERENCE,"bar");
+ MessageMessage::shared_ptr m2(new MessageMessage(0, 2, t2, r1));
+
+ MessageAppendBody::shared_ptr a1(new MessageAppendBody(v));
+ MessageAppendBody::shared_ptr a2(new MessageAppendBody(v));
+
r1->addMessage(m1);
r1->addMessage(m2);
CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size());
r1->append(a1);
r1->append(a2);
CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size());
- const vector<MessageMessage::shared_ptr> messages = r1->getMessages();
r1->close();
- try {
- registry.open("bar");
- CPPUNIT_FAIL("Expected exception");
- } catch(...) {}
- CPPUNIT_ASSERT_EQUAL(messages[0], m1);
- CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[0], a1);
- CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[1], a2);
-
- CPPUNIT_ASSERT_EQUAL(messages[1], m2);
- CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[0], a1);
- CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[1], a2);
+ CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[0], a1);
+ CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[1], a2);
+
+ CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[0], a1);
+ CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[1], a2);
}
-
-
};
// Make this test suite a plugin.