You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/01/09 12:20:52 UTC
svn commit: r1556787 - in /qpid/trunk/qpid/cpp/src/qpid: amqp/Decoder.cpp
broker/amqp/Incoming.cpp broker/amqp/Incoming.h
Author: gsim
Date: Thu Jan 9 11:20:51 2014
New Revision: 1556787
URL: http://svn.apache.org/r1556787
Log:
QPID-5457: support for messages composed of multiple transfers
Modified:
qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp?rev=1556787&r1=1556786&r2=1556787&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/Decoder.cpp Thu Jan 9 11:20:51 2014
@@ -301,7 +301,7 @@ Descriptor Decoder::readDescriptor()
void Decoder::advance(size_t n)
{
- if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds"));
+ if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds: requested advance of " << n << " at " << position << " but only " << available() << " available"));
position += n;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1556787&r1=1556786&r2=1556787&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Thu Jan 9 11:20:51 2014
@@ -27,6 +27,7 @@
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -111,19 +112,38 @@ DecodingIncoming::~DecodingIncoming() {}
void DecodingIncoming::readable(pn_delivery_t* delivery)
{
- boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery)));
- /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize());
- received->scan();
- pn_link_advance(link);
-
- qpid::broker::Message message(received, received);
- message.setPublisher(session->getParent());
- userid.verify(message.getUserId());
- message.computeExpiration(expiryPolicy);
- handle(message);
- --window;
- received->begin();
- Transfer t(delivery, session);
- received->end(t);
+ size_t pending = pn_delivery_pending(delivery);
+ size_t offset = partial ? partial->getSize() : 0;
+ boost::intrusive_ptr<Message> received(new Message(offset + pending));
+ if (partial) {
+ ::memcpy(received->getData(), partial->getData(), offset);
+ partial = boost::intrusive_ptr<Message>();
+ }
+ assert(received->getSize() == pending + offset);
+ pn_link_recv(link, received->getData() + offset, pending);
+
+ if (pn_delivery_partial(delivery)) {
+ QPID_LOG(debug, "Message incomplete: received " << pending << " bytes, now have " << received->getSize());
+ partial = received;
+ } else {
+ if (offset) {
+ QPID_LOG(debug, "Message complete: received " << pending << " bytes, " << received->getSize() << " in total");
+ } else {
+ QPID_LOG(debug, "Message received: " << received->getSize() << " bytes");
+ }
+
+ received->scan();
+ pn_link_advance(link);
+
+ qpid::broker::Message message(received, received);
+ message.setPublisher(session->getParent());
+ userid.verify(message.getUserId());
+ message.computeExpiration(expiryPolicy);
+ handle(message);
+ --window;
+ received->begin();
+ Transfer t(delivery, session);
+ received->end(t);
+ }
}
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1556787&r1=1556786&r2=1556787&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h Thu Jan 9 11:20:51 2014
@@ -78,6 +78,7 @@ class DecodingIncoming : public Incoming
private:
boost::shared_ptr<Session> session;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ boost::intrusive_ptr<Message> partial;
};
}}} // namespace qpid::broker::amqp
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org