You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2012/07/06 17:41:33 UTC

svn commit: r1358275 - in /qpid/trunk/qpid/cpp: include/qpid/amqp_0_10/ src/qpid/amqp_0_10/ src/qpid/client/amqp0_10/

Author: astitcher
Date: Fri Jul  6 15:41:33 2012
New Revision: 1358275

URL: http://svn.apache.org/viewvc?rev=1358275&view=rev
Log:
QPID-3883: Using application headers in messages causes a very large slowdown
Add subject to outgoing messsage before encoding it to save a round trip
decode-encode.

Modified:
    qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h

Modified: qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/amqp_0_10/Codecs.h Fri Jul  6 15:41:33 2012
@@ -69,6 +69,8 @@ class QPID_COMMON_CLASS_EXTERN ListCodec
  */
 QPID_COMMON_EXTERN void translate(const qpid::types::Variant::Map& from,
                                   qpid::framing::FieldTable& to);
+QPID_COMMON_EXTERN void translate(const qpid::types::Variant::Map& from, const std::string& efield, const qpid::types::Variant& evalue,
+                                  qpid::framing::FieldTable& to);
 QPID_COMMON_EXTERN void translate(const qpid::framing::FieldTable& from,
                                   qpid::types::Variant::Map& to);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp Fri Jul  6 15:41:33 2012
@@ -271,6 +271,16 @@ uint32_t encodedSize(const Variant::Map&
     return size;
 }
 
+uint32_t encodedSize(const Variant::Map& values, const std::string& efield, const Variant& evalue)
+{
+    uint32_t size = 4/*size field*/ + 4/*count field*/;
+    for(Variant::Map::const_iterator i = values.begin(); i != values.end(); ++i) {
+        size += 1/*size of key*/ + (i->first).size() + 1/*typecode*/ + encodedSize(i->second);
+    }
+    size += 1/*size of key*/ + efield.size() + 1/*typecode*/ + encodedSize(evalue);
+    return size;
+}
+
 uint32_t encodedSize(const Variant::List& values)
 {
     uint32_t size = 4/*size field*/ + 4/*count field*/;
@@ -399,6 +409,21 @@ void encode(const Variant::Map& map, uin
     assert(s + len == buffer.getPosition());
 }
 
+void encode(const Variant::Map& map, const std::string& efield, const Variant& evalue, uint32_t len, qpid::framing::Buffer& buffer)
+{
+    uint32_t s = buffer.getPosition();
+    buffer.putLong(len - 4);//exclusive of the size field itself
+    buffer.putLong(map.size() + 1 /* The extra field */ );
+    for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+        buffer.putShortString(i->first);
+        encode(i->second, buffer);
+    }
+    buffer.putShortString(efield);
+    encode(evalue, buffer);
+
+    assert(s + len == buffer.getPosition());
+}
+
 void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer)
 {
     uint32_t s = buffer.getPosition();
@@ -475,8 +500,26 @@ void translate(const Variant::Map& from,
     assert( len == buff.getPosition() );
     buff.reset();
     to.decode(buff);
+}
+
+void translate(const Variant::Map& from, const std::string& efield, const Variant& evalue, FieldTable& to)
+{
+    // Create buffer of correct size to encode Variant::Map
+    uint32_t len = encodedSize(from, efield, evalue);
+    std::vector<char> space(len);
+    qpid::framing::Buffer buff(&space[0], len);
+
+    // Encode Variant::Map into buffer directly -
+    // We pass the already calculated length in to avoid
+    // recalculating it.
+    encode(from, efield, evalue, len, buff);
 
-    //convert(from, to, &toFieldTableEntry);
+    // Give buffer to FieldTable
+    // Could speed this up a bit by avoiding copying
+    // the buffer we just created into the FieldTable
+    assert( len == buff.getPosition() );
+    buff.reset();
+    to.decode(buff);
 }
 
 void translate(const FieldTable& from, Variant::Map& to)

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Fri Jul  6 15:41:33 2012
@@ -58,7 +58,12 @@ void OutgoingMessage::convert(const qpid
     if (address) {
         message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
     }
-    translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+    if (!subject.empty()) {
+        Variant v(subject); v.setEncoding("utf8");
+        translate(from.getProperties(), SUBJECT, v, message.getMessageProperties().getApplicationHeaders());
+    } else {
+        translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+    }
     if (from.getTtl().getMilliseconds()) {
         message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
     }
@@ -89,16 +94,14 @@ void OutgoingMessage::convert(const qpid
     }
 }
 
-void OutgoingMessage::setSubject(const std::string& subject)
+void OutgoingMessage::setSubject(const std::string& s)
 {
-    if (!subject.empty()) {
-        message.getMessageProperties().getApplicationHeaders().setString(SUBJECT, subject);
-    }
+    subject = s;
 }
 
 std::string OutgoingMessage::getSubject() const
 {
-    return message.getMessageProperties().getApplicationHeaders().getAsString(SUBJECT);
+    return subject;
 }
 
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h Fri Jul  6 15:41:33 2012
@@ -35,6 +35,7 @@ struct OutgoingMessage
 {
     qpid::client::Message message;
     qpid::client::Completion status;
+    std::string subject;
 
     void convert(const qpid::messaging::Message&);
     void setSubject(const std::string& subject);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Fri Jul  6 15:41:33 2012
@@ -37,10 +37,10 @@ SenderImpl::SenderImpl(SessionImpl& _par
 void SenderImpl::send(const qpid::messaging::Message& message, bool sync) 
 {
     if (unreliable) {           // immutable, don't need lock
-        UnreliableSend f(*this, &message);
+        UnreliableSend f(*this, message);
         parent->execute(f);
     } else {
-        Send f(*this, &message);
+        Send f(*this, message);
         while (f.repeat) parent->execute(f);
     }
     if (sync) parent->sync(true);
@@ -117,8 +117,8 @@ void SenderImpl::sendImpl(const qpid::me
 {
     sys::Mutex::ScopedLock l(lock);
     std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
-    msg->convert(m);
     msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+    msg->convert(m);
     outgoing.push_back(msg.release());
     sink->send(session, name, outgoing.back());
 }
@@ -127,8 +127,8 @@ void SenderImpl::sendUnreliable(const qp
 {
     sys::Mutex::ScopedLock l(lock);
     OutgoingMessage msg;
-    msg.convert(m);
     msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+    msg.convert(m);
     sink->send(session, name, msg);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=1358275&r1=1358274&r2=1358275&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Fri Jul  6 15:41:33 2012
@@ -99,32 +99,32 @@ class SenderImpl : public qpid::messagin
 
     struct Send : Command
     {
-        const qpid::messaging::Message* message;
+        const qpid::messaging::Message& message;
         bool repeat;
 
-        Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+        Send(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m), repeat(true) {}
         void operator()() 
         {
             impl.waitForCapacity();
             //from this point message will be recorded if there is any
             //failure (and replayed) so need not repeat the call
             repeat = false;
-            impl.sendImpl(*message);
+            impl.sendImpl(message);
         }
     };
 
     struct UnreliableSend : Command
     {
-        const qpid::messaging::Message* message;
+        const qpid::messaging::Message& message;
 
-        UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
+        UnreliableSend(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m) {}
         void operator()() 
         {
             //TODO: ideally want to put messages on the outbound
             //queue and pull them off in io thread, but the old
             //0-10 client doesn't support that option so for now
             //we simply don't queue unreliable messages
-            impl.sendUnreliable(*message);                
+            impl.sendUnreliable(message);
         }
     };
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org