You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2012/07/06 17:41:33 UTC
svn commit: r1358275 - in /qpid/trunk/qpid/cpp: include/qpid/amqp_0_10/
src/qpid/amqp_0_10/ src/qpid/client/amqp0_10/
Author: astitcher
Date: Fri Jul 6 15:41:33 2012
New Revision: 1358275
URL: http://svn.apache.org/viewvc?rev=1358275&view=rev
Log:
QPID-3883: Using application headers in messages causes a very large slowdown
Add subject to outgoing messsage before encoding it to save a round trip
decode-encode.
Modified:
qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
Modified: qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h Fri Jul 6 15:41:33 2012
@@ -69,6 +69,8 @@ class QPID_COMMON_CLASS_EXTERN ListCodec
*/
QPID_COMMON_EXTERN void translate(const qpid::types::Variant::Map& from,
qpid::framing::FieldTable& to);
+QPID_COMMON_EXTERN void translate(const qpid::types::Variant::Map& from, const std::string& efield, const qpid::types::Variant& evalue,
+ qpid::framing::FieldTable& to);
QPID_COMMON_EXTERN void translate(const qpid::framing::FieldTable& from,
qpid::types::Variant::Map& to);
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp Fri Jul 6 15:41:33 2012
@@ -271,6 +271,16 @@ uint32_t encodedSize(const Variant::Map&
return size;
}
+uint32_t encodedSize(const Variant::Map& values, const std::string& efield, const Variant& evalue)
+{
+ uint32_t size = 4/*size field*/ + 4/*count field*/;
+ for(Variant::Map::const_iterator i = values.begin(); i != values.end(); ++i) {
+ size += 1/*size of key*/ + (i->first).size() + 1/*typecode*/ + encodedSize(i->second);
+ }
+ size += 1/*size of key*/ + efield.size() + 1/*typecode*/ + encodedSize(evalue);
+ return size;
+}
+
uint32_t encodedSize(const Variant::List& values)
{
uint32_t size = 4/*size field*/ + 4/*count field*/;
@@ -399,6 +409,21 @@ void encode(const Variant::Map& map, uin
assert(s + len == buffer.getPosition());
}
+void encode(const Variant::Map& map, const std::string& efield, const Variant& evalue, uint32_t len, qpid::framing::Buffer& buffer)
+{
+ uint32_t s = buffer.getPosition();
+ buffer.putLong(len - 4);//exclusive of the size field itself
+ buffer.putLong(map.size() + 1 /* The extra field */ );
+ for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ buffer.putShortString(i->first);
+ encode(i->second, buffer);
+ }
+ buffer.putShortString(efield);
+ encode(evalue, buffer);
+
+ assert(s + len == buffer.getPosition());
+}
+
void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer)
{
uint32_t s = buffer.getPosition();
@@ -475,8 +500,26 @@ void translate(const Variant::Map& from,
assert( len == buff.getPosition() );
buff.reset();
to.decode(buff);
+}
+
+void translate(const Variant::Map& from, const std::string& efield, const Variant& evalue, FieldTable& to)
+{
+ // Create buffer of correct size to encode Variant::Map
+ uint32_t len = encodedSize(from, efield, evalue);
+ std::vector<char> space(len);
+ qpid::framing::Buffer buff(&space[0], len);
+
+ // Encode Variant::Map into buffer directly -
+ // We pass the already calculated length in to avoid
+ // recalculating it.
+ encode(from, efield, evalue, len, buff);
- //convert(from, to, &toFieldTableEntry);
+ // Give buffer to FieldTable
+ // Could speed this up a bit by avoiding copying
+ // the buffer we just created into the FieldTable
+ assert( len == buff.getPosition() );
+ buff.reset();
+ to.decode(buff);
}
void translate(const FieldTable& from, Variant::Map& to)
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Fri Jul 6 15:41:33 2012
@@ -58,7 +58,12 @@ void OutgoingMessage::convert(const qpid
if (address) {
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
- translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+ if (!subject.empty()) {
+ Variant v(subject); v.setEncoding("utf8");
+ translate(from.getProperties(), SUBJECT, v, message.getMessageProperties().getApplicationHeaders());
+ } else {
+ translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+ }
if (from.getTtl().getMilliseconds()) {
message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
}
@@ -89,16 +94,14 @@ void OutgoingMessage::convert(const qpid
}
}
-void OutgoingMessage::setSubject(const std::string& subject)
+void OutgoingMessage::setSubject(const std::string& s)
{
- if (!subject.empty()) {
- message.getMessageProperties().getApplicationHeaders().setString(SUBJECT, subject);
- }
+ subject = s;
}
std::string OutgoingMessage::getSubject() const
{
- return message.getMessageProperties().getApplicationHeaders().getAsString(SUBJECT);
+ return subject;
}
}}} // namespace qpid::client::amqp0_10
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h Fri Jul 6 15:41:33 2012
@@ -35,6 +35,7 @@ struct OutgoingMessage
{
qpid::client::Message message;
qpid::client::Completion status;
+ std::string subject;
void convert(const qpid::messaging::Message&);
void setSubject(const std::string& subject);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Fri Jul 6 15:41:33 2012
@@ -37,10 +37,10 @@ SenderImpl::SenderImpl(SessionImpl& _par
void SenderImpl::send(const qpid::messaging::Message& message, bool sync)
{
if (unreliable) { // immutable, don't need lock
- UnreliableSend f(*this, &message);
+ UnreliableSend f(*this, message);
parent->execute(f);
} else {
- Send f(*this, &message);
+ Send f(*this, message);
while (f.repeat) parent->execute(f);
}
if (sync) parent->sync(true);
@@ -117,8 +117,8 @@ void SenderImpl::sendImpl(const qpid::me
{
sys::Mutex::ScopedLock l(lock);
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
- msg->convert(m);
msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ msg->convert(m);
outgoing.push_back(msg.release());
sink->send(session, name, outgoing.back());
}
@@ -127,8 +127,8 @@ void SenderImpl::sendUnreliable(const qp
{
sys::Mutex::ScopedLock l(lock);
OutgoingMessage msg;
- msg.convert(m);
msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ msg.convert(m);
sink->send(session, name, msg);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Fri Jul 6 15:41:33 2012
@@ -99,32 +99,32 @@ class SenderImpl : public qpid::messagin
struct Send : Command
{
- const qpid::messaging::Message* message;
+ const qpid::messaging::Message& message;
bool repeat;
- Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+ Send(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m), repeat(true) {}
void operator()()
{
impl.waitForCapacity();
//from this point message will be recorded if there is any
//failure (and replayed) so need not repeat the call
repeat = false;
- impl.sendImpl(*message);
+ impl.sendImpl(message);
}
};
struct UnreliableSend : Command
{
- const qpid::messaging::Message* message;
+ const qpid::messaging::Message& message;
- UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
+ UnreliableSend(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m) {}
void operator()()
{
//TODO: ideally want to put messages on the outbound
//queue and pull them off in io thread, but the old
//0-10 client doesn't support that option so for now
//we simply don't queue unreliable messages
- impl.sendUnreliable(*message);
+ impl.sendUnreliable(message);
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org