You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/06/25 19:25:47 UTC

svn commit: r958044 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/IncomingMessages.cpp qpid/client/amqp0_10/OutgoingMessage.cpp tests/MessagingFixture.h tests/MessagingSessionTests.cpp

Author: gsim
Date: Fri Jun 25 17:25:46 2010
New Revision: 958044

URL: http://svn.apache.org/viewvc?rev=958044&view=rev
Log:
QPID-2698: recognise special property names for amqp 0-10 specific message- and delivery- properties

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
    qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=958044&r1=958043&r2=958044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Fri Jun 25 17:25:46 2010
@@ -290,6 +290,10 @@ namespace {
 //TODO: unify conversion to and from 0-10 message that is currently
 //split between IncomingMessages and OutgoingMessage
 const std::string SUBJECT("qpid.subject");
+
+const std::string X_APP_ID("x-amqp-0-10.app-id");
+const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
+const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
 }
 
 void populateHeaders(qpid::messaging::Message& message, 
@@ -312,6 +316,21 @@ void populateHeaders(qpid::messaging::Me
         translate(messageProperties->getApplicationHeaders(), message.getProperties());
         message.setCorrelationId(messageProperties->getCorrelationId());
         message.setUserId(messageProperties->getUserId());
+        if (messageProperties->hasMessageId()) {
+            message.setMessageId(messageProperties->getMessageId().str());
+        }
+        //expose 0-10 specific items through special properties: 
+        //    app-id, content-encoding
+        if (messageProperties->hasAppId()) {
+            message.getProperties()[X_APP_ID] = messageProperties->getAppId();
+        }
+        if (messageProperties->hasContentEncoding()) {
+            message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
+        }
+        //    routing-key, others?
+        if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
+            message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
+        }
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=958044&r1=958043&r2=958044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Fri Jun 25 17:25:46 2010
@@ -21,10 +21,12 @@
 #include "qpid/client/amqp0_10/OutgoingMessage.h"
 #include "qpid/client/amqp0_10/AddressResolution.h"
 #include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/Variant.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/MessageImpl.h"
 #include "qpid/framing/enum.h"
+#include <sstream>
 
 namespace qpid {
 namespace client {
@@ -32,9 +34,19 @@ namespace amqp0_10 {
 
 using qpid::messaging::Address;
 using qpid::messaging::MessageImplAccess;
+using qpid::types::Variant;
 using namespace qpid::framing::message;
 using namespace qpid::amqp_0_10;
 
+namespace {
+//TODO: unify conversion to and from 0-10 message that is currently
+//split between IncomingMessages and OutgoingMessage
+const std::string SUBJECT("qpid.subject");
+const std::string X_APP_ID("x-amqp-0-10.app-id");
+const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
+const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+}
+
 void OutgoingMessage::convert(const qpid::messaging::Message& from)
 {
     //TODO: need to avoid copying as much as possible
@@ -55,10 +67,24 @@ void OutgoingMessage::convert(const qpid
         message.getDeliveryProperties().setRedelivered(true);
     }
     if (from.getPriority()) message.getDeliveryProperties().setPriority(from.getPriority());
-}
 
-namespace {
-const std::string SUBJECT("qpid.subject");
+    //allow certain 0-10 specific items to be set through special properties: 
+    //    message-id, app-id, content-encoding
+    if (from.getMessageId().size()) {
+        qpid::framing::Uuid uuid;
+        std::istringstream data(from.getMessageId());
+        data >> uuid;
+        message.getMessageProperties().setMessageId(uuid);
+    }
+    Variant::Map::const_iterator i;
+    i = from.getProperties().find(X_APP_ID);
+    if (i != from.getProperties().end()) {
+        message.getMessageProperties().setAppId(i->second.asString());
+    }
+    i = from.getProperties().find(X_CONTENT_ENCODING);
+    if (i != from.getProperties().end()) {
+        message.getMessageProperties().setContentEncoding(i->second.asString());
+    }
 }
 
 void OutgoingMessage::setSubject(const std::string& subject)

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h?rev=958044&r1=958043&r2=958044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h Fri Jun 25 17:25:46 2010
@@ -79,6 +79,11 @@ struct BrokerAdmin
         return !result.getNotFound();
     }
 
+    void send(qpid::client::Message& message, const std::string& exchange=std::string())
+    {
+        session.messageTransfer(qpid::client::arg::destination=exchange, qpid::client::arg::content=message);
+    }
+
     ~BrokerAdmin()
     {
         session.close();

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=958044&r1=958043&r2=958044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Fri Jun 25 17:25:46 2010
@@ -712,6 +712,51 @@ QPID_AUTO_TEST_CASE(testOptionVerificati
     BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);    
 }
 
+QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
+{
+    QueueFixture fix;
+
+    qpid::client::Message out;
+    out.getDeliveryProperties().setRoutingKey(fix.queue);
+    out.getMessageProperties().setAppId("my-app-id");
+    out.getMessageProperties().setMessageId(qpid::framing::Uuid(true));
+    out.getMessageProperties().setContentEncoding("my-content-encoding");
+    fix.admin.send(out);
+
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(Duration::SECOND * 5);
+    BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.routing-key"].asString(), out.getDeliveryProperties().getRoutingKey());
+    BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.app-id"].asString(), out.getMessageProperties().getAppId());
+    BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.content-encoding"].asString(), out.getMessageProperties().getContentEncoding());
+    BOOST_CHECK_EQUAL(in.getMessageId(), out.getMessageProperties().getMessageId().str());
+    fix.session.acknowledge(true);
+}
+
+QPID_AUTO_TEST_CASE(testSendSpecialProperties)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Message out("test-message");
+    std::string appId = "my-app-id";
+    std::string contentEncoding = "my-content-encoding";
+    out.getProperties()["x-amqp-0-10.app-id"] = appId;
+    out.getProperties()["x-amqp-0-10.content-encoding"] = contentEncoding;
+    out.setMessageId(qpid::framing::Uuid(true).str());
+    sender.send(out, true);
+
+    qpid::client::LocalQueue q;
+    qpid::client::SubscriptionManager subs(fix.admin.session);
+    qpid::client::Subscription s = subs.subscribe(q, fix.queue);
+    qpid::client::Message in = q.get();
+    s.cancel();
+    fix.admin.session.sync();
+
+    BOOST_CHECK_EQUAL(in.getMessageProperties().getAppId(), appId);
+    BOOST_CHECK_EQUAL(in.getMessageProperties().getContentEncoding(), contentEncoding);
+    BOOST_CHECK_EQUAL(in.getMessageProperties().getMessageId().str(), out.getMessageId());
+}
+
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org