You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/05 23:23:15 UTC
svn commit: r525964 - in /incubator/qpid/trunk/qpid/cpp/src:
broker/BrokerMessageMessage.cpp broker/BrokerMessageMessage.h
broker/Reference.cpp client/MessageMessageChannel.cpp shared_ptr.h
tests/ClientChannelTest.cpp
Author: aconway
Date: Thu Apr 5 14:23:14 2007
New Revision: 525964
URL: http://svn.apache.org/viewvc?view=rev&rev=525964
Log:
* cpp/src/broker/BrokerMessageMessage.h: Change reference from weak_ptr to
shared_ptr. Broker messages hold their reference.
* cpp/src/broker/Reference.cpp (close): clear messages array to break
shared_ptr cycle and avoid a leak.
* cpp/src/client/MessageMessageChannel.cpp (publish): Support references
for large messages.
* cpp/src/shared_ptr.h (make_shared_ptr): added deleter variant.
* cpp/src/tests/ClientChannelTest.cpp: Enabled testGetNoContent,
testGetFragmentedMessage
Modified:
incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h
incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp
incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp Thu Apr 5 14:23:14 2007
@@ -312,7 +312,7 @@
}
MessageMessage::ReferencePtr MessageMessage::getReference() const {
- return reference.lock();
+ return reference;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h Thu Apr 5 14:23:14 2007
@@ -93,7 +93,7 @@
framing::RequestId requestId;
const TransferPtr transfer;
- const boost::weak_ptr<Reference> reference;
+ const boost::shared_ptr<Reference> reference;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp Thu Apr 5 14:23:14 2007
@@ -46,6 +46,7 @@
}
void Reference::close() {
+ messages.clear();
registry->references.erase(getId());
}
Modified: incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp Thu Apr 5 14:23:14 2007
@@ -192,6 +192,17 @@
));
}
+// FIXME aconway 2007-04-05: Generated code should provide this.
+/**
+ * Calculate the size of a frame containing the given body type
+ * if all variable-lengths parts are empty.
+ */
+template <class T> size_t overhead() {
+ static AMQFrame frame(
+ ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion())));
+ return frame.size();
+}
+
void MessageMessageChannel::publish(
const Message& msg, const Exchange& exchange,
const std::string& routingKey, bool mandatory, bool immediate)
@@ -201,11 +212,35 @@
msg, exchange.getName(), routingKey, mandatory, immediate);
// Frame itself uses 8 bytes.
u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
- if (transfer->size() > frameMax) {
- // FIXME aconway 2007-02-23:
- throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
+ if (transfer->size() <= frameMax) {
+ channel.sendAndReceive<MessageOkBody>(transfer);
+ }
+ else {
+ std::string ref = newTag();
+ std::string data = transfer->getBody().getValue();
+ size_t chunk =
+ channel.connection->getMaxFrameSize() -
+ (overhead<MessageAppendBody>() + ref.size());
+ // TODO aconway 2007-04-05: cast around lack of generated setters
+ const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref);
+ channel.send(
+ make_shared_ptr(new MessageOpenBody(channel.version, ref)));
+ channel.send(transfer);
+ const char* p = data.data();
+ const char* end = data.data()+data.size();
+ while (p+chunk <= end) {
+ channel.send(
+ make_shared_ptr(
+ new MessageAppendBody(channel.version, ref, std::string(p, chunk))));
+ p += chunk;
+ }
+ if (p < end) {
+ channel.send(
+ make_shared_ptr(
+ new MessageAppendBody(channel.version, ref, std::string(p, end-p))));
+ }
+ channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref)));
}
- channel.sendAndReceive<MessageOkBody>(transfer);
}
void copy(Message& msg, MessageTransferBody& transfer) {
Modified: incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h Thu Apr 5 14:23:14 2007
@@ -37,6 +37,11 @@
return shared_ptr<T>(ptr);
}
+template <class T, class D>
+shared_ptr<T> make_shared_ptr(T* ptr, D deleter) {
+ return shared_ptr<T>(ptr, deleter);
+}
+
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Thu Apr 5 14:23:14 2007
@@ -207,6 +207,8 @@
class MessageClientChannelTest : public ClientChannelTestBase {
CPPUNIT_TEST_SUITE(MessageClientChannelTest);
CPPUNIT_TEST(testPublishGet);
+ CPPUNIT_TEST(testGetNoContent);
+ CPPUNIT_TEST(testGetFragmentedMessage);
CPPUNIT_TEST_SUITE_END();
public:
MessageClientChannelTest() {