You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/05 23:23:15 UTC

svn commit: r525964 - in /incubator/qpid/trunk/qpid/cpp/src: broker/BrokerMessageMessage.cpp broker/BrokerMessageMessage.h broker/Reference.cpp client/MessageMessageChannel.cpp shared_ptr.h tests/ClientChannelTest.cpp

Author: aconway
Date: Thu Apr  5 14:23:14 2007
New Revision: 525964

URL: http://svn.apache.org/viewvc?view=rev&rev=525964
Log:

* cpp/src/broker/BrokerMessageMessage.h: Change reference from weak_ptr to
  shared_ptr. Broker messages hold their reference.
* cpp/src/broker/Reference.cpp (close): clear messages array to break
  shared_ptr cycle and avoid a leak.
* cpp/src/client/MessageMessageChannel.cpp (publish): Support references
  for large messages.
* cpp/src/shared_ptr.h (make_shared_ptr): added deleter variant.
* cpp/src/tests/ClientChannelTest.cpp: Enabled testGetNoContent,
  testGetFragmentedMessage

Modified:
    incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h
    incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp
    incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.cpp Thu Apr  5 14:23:14 2007
@@ -312,7 +312,7 @@
 }
 
 MessageMessage::ReferencePtr MessageMessage::getReference() const {
-    return reference.lock();
+    return reference;
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageMessage.h Thu Apr  5 14:23:14 2007
@@ -93,7 +93,7 @@
   
     framing::RequestId requestId;
     const TransferPtr transfer;
-    const boost::weak_ptr<Reference> reference;
+    const boost::shared_ptr<Reference> reference;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/Reference.cpp Thu Apr  5 14:23:14 2007
@@ -46,6 +46,7 @@
 }
 
 void Reference::close() {
+    messages.clear();
     registry->references.erase(getId());
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp Thu Apr  5 14:23:14 2007
@@ -192,6 +192,17 @@
         ));
 }
 
+// FIXME aconway 2007-04-05: Generated code should provide this.
+/**
+ * Calculate the size of a frame containing the given body type
+ * if all variable-lengths parts are empty.
+ */
+template <class T> size_t overhead() {
+    static AMQFrame frame(
+        ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion())));
+    return frame.size();
+}
+
 void MessageMessageChannel::publish(
     const Message& msg, const Exchange& exchange,
     const std::string& routingKey, bool mandatory, bool immediate)
@@ -201,11 +212,35 @@
         msg, exchange.getName(), routingKey, mandatory, immediate);
     // Frame itself uses 8 bytes.
     u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
-    if (transfer->size()  > frameMax) {
-        // FIXME aconway 2007-02-23: 
-        throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
+    if (transfer->size()  <= frameMax) {
+        channel.sendAndReceive<MessageOkBody>(transfer);
+    }
+    else {
+        std::string ref = newTag();
+        std::string data = transfer->getBody().getValue();
+        size_t chunk =
+            channel.connection->getMaxFrameSize() -
+            (overhead<MessageAppendBody>() + ref.size());
+        // TODO aconway 2007-04-05: cast around lack of generated setters
+        const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref);
+        channel.send(
+            make_shared_ptr(new MessageOpenBody(channel.version, ref)));
+        channel.send(transfer);
+        const char* p = data.data();
+        const char* end = data.data()+data.size();
+        while (p+chunk <= end) {
+            channel.send(
+                make_shared_ptr(
+                    new MessageAppendBody(channel.version, ref, std::string(p, chunk))));
+            p += chunk;
+        }
+        if (p < end) {
+            channel.send(
+                make_shared_ptr(
+                    new MessageAppendBody(channel.version, ref, std::string(p, end-p))));
+        }
+        channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref)));
     }
-    channel.sendAndReceive<MessageOkBody>(transfer);
 }
         
 void copy(Message& msg, MessageTransferBody& transfer) {

Modified: incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h Thu Apr  5 14:23:14 2007
@@ -37,6 +37,11 @@
     return shared_ptr<T>(ptr);
 }
 
+template <class T, class D>
+shared_ptr<T> make_shared_ptr(T* ptr, D deleter) {
+    return shared_ptr<T>(ptr, deleter);
+}
+
 } // namespace qpid
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?view=diff&rev=525964&r1=525963&r2=525964
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Thu Apr  5 14:23:14 2007
@@ -207,6 +207,8 @@
 class MessageClientChannelTest : public ClientChannelTestBase {
     CPPUNIT_TEST_SUITE(MessageClientChannelTest);
     CPPUNIT_TEST(testPublishGet);
+    CPPUNIT_TEST(testGetNoContent);
+    CPPUNIT_TEST(testGetFragmentedMessage);
     CPPUNIT_TEST_SUITE_END();
   public:
     MessageClientChannelTest() {