You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/01/09 12:20:52 UTC

svn commit: r1556787 - in /qpid/trunk/qpid/cpp/src/qpid: amqp/Decoder.cpp broker/amqp/Incoming.cpp broker/amqp/Incoming.h

Author: gsim
Date: Thu Jan  9 11:20:51 2014
New Revision: 1556787

URL: http://svn.apache.org/r1556787
Log:
QPID-5457: support for messages composed of multiple transfers

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp?rev=1556787&r1=1556786&r2=1556787&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp Thu Jan  9 11:20:51 2014
@@ -301,7 +301,7 @@ Descriptor Decoder::readDescriptor()
 
 void Decoder::advance(size_t n)
 {
-    if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds"));
+    if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds: requested advance of " << n << " at " << position << " but only " << available() << " available"));
     position += n;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1556787&r1=1556786&r2=1556787&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Thu Jan  9 11:20:51 2014
@@ -27,6 +27,7 @@
 #include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
@@ -111,19 +112,38 @@ DecodingIncoming::~DecodingIncoming() {}
 
 void DecodingIncoming::readable(pn_delivery_t* delivery)
 {
-    boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery)));
-    /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize());
-    received->scan();
-    pn_link_advance(link);
-
-    qpid::broker::Message message(received, received);
-    message.setPublisher(session->getParent());
-    userid.verify(message.getUserId());
-    message.computeExpiration(expiryPolicy);
-    handle(message);
-    --window;
-    received->begin();
-    Transfer t(delivery, session);
-    received->end(t);
+    size_t pending = pn_delivery_pending(delivery);
+    size_t offset = partial ? partial->getSize() : 0;
+    boost::intrusive_ptr<Message> received(new Message(offset + pending));
+    if (partial) {
+        ::memcpy(received->getData(), partial->getData(), offset);
+        partial = boost::intrusive_ptr<Message>();
+    }
+    assert(received->getSize() == pending + offset);
+    pn_link_recv(link, received->getData() + offset, pending);
+
+    if (pn_delivery_partial(delivery)) {
+        QPID_LOG(debug, "Message incomplete: received " << pending << " bytes, now have " << received->getSize());
+        partial = received;
+    } else {
+        if (offset) {
+            QPID_LOG(debug, "Message complete: received " << pending << " bytes, " << received->getSize() << " in total");
+        } else {
+            QPID_LOG(debug, "Message received: " << received->getSize() << " bytes");
+        }
+
+        received->scan();
+        pn_link_advance(link);
+
+        qpid::broker::Message message(received, received);
+        message.setPublisher(session->getParent());
+        userid.verify(message.getUserId());
+        message.computeExpiration(expiryPolicy);
+        handle(message);
+        --window;
+        received->begin();
+        Transfer t(delivery, session);
+        received->end(t);
+    }
 }
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1556787&r1=1556786&r2=1556787&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h Thu Jan  9 11:20:51 2014
@@ -78,6 +78,7 @@ class DecodingIncoming : public Incoming
   private:
     boost::shared_ptr<Session> session;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+    boost::intrusive_ptr<Message> partial;
 };
 
 }}} // namespace qpid::broker::amqp



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org