You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/11/15 15:42:23 UTC

svn commit: r1409813 - in /qpid/trunk/qpid/cpp/src/qpid: broker/amqp/ messaging/amqp/

Author: gsim
Date: Thu Nov 15 14:42:22 2012
New Revision: 1409813

URL: http://svn.apache.org/viewvc?rev=1409813&view=rev
Log:
QPID-4368: Add support for subject, translated to a filter (i.e. at present a binding key) by receivers and used as default value for senders

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Nov 15 14:42:22 2012
@@ -22,6 +22,8 @@
 #include "Outgoing.h"
 #include "Message.h"
 #include "ManagedConnection.h"
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/Descriptor.h"
 #include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/DeliverableMessage.h"
@@ -104,8 +106,76 @@ void Session::attach(pn_link_t* link)
             QueueSettings settings(false, true);
             //TODO: populate settings from source details when available from engine
             queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
-            //TODO: bind based on filter when that is exposed by engine
-            if (exchange->getType() == FanOutExchange::typeName) {
+            pn_data_t* filter = pn_terminus_filter(source);
+            pn_data_next(filter);
+            if (filter && !pn_data_is_null(filter)) {
+                if (pn_data_type(filter) == PN_MAP) {
+                    pn_data_t* echo = pn_terminus_filter(pn_link_source(link));
+                    pn_data_put_map(echo);
+                    pn_data_enter(echo);
+                    size_t count = pn_data_get_map(filter)/2;
+                    QPID_LOG(debug, "Got filter map with " << count << " entries");
+                    pn_data_enter(filter);
+                    for (size_t i = 0; i < count; i++) {
+                        pn_bytes_t fname = pn_data_get_symbol(filter);
+                        pn_data_next(filter);
+                        bool isDescribed = pn_data_is_described(filter);
+                        qpid::amqp::Descriptor descriptor(0);
+                        if (isDescribed) {
+                            pn_data_enter(filter);
+                            pn_data_next(filter);
+                            //TODO: use or at least verify descriptor
+                            if (pn_data_type(filter) == PN_ULONG) {
+                                descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(filter));
+                            } else if (pn_data_type(filter) == PN_SYMBOL) {
+                                pn_bytes_t d = pn_data_get_symbol(filter);
+                                qpid::amqp::CharSequence c;
+                                c.data = d.start;
+                                c.size = d.size;
+                                descriptor = qpid::amqp::Descriptor(c);
+                            } else {
+                                QPID_LOG(notice, "Ignoring filter with descriptor with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter));
+                                continue;
+                            }
+                            QPID_LOG(debug, "Got filter with descriptor " << descriptor);
+                            pn_data_next(filter);
+                        } else {
+                            QPID_LOG(debug, "Got undescribed filter of type " << pn_data_type(filter));
+                        }
+                        if (pn_data_type(filter) == PN_STRING) {
+                            pn_bytes_t value = pn_data_get_string(filter);
+                            pn_data_next(filter);
+                            exchange->bind(queue, std::string(value.start, value.size), 0);
+                            pn_data_put_symbol(echo, fname);
+                            if (isDescribed) {
+                                pn_data_put_described(echo);
+                                pn_data_enter(echo);
+                                pn_bytes_t symbol;
+                                switch (descriptor.type) {
+                                  case qpid::amqp::Descriptor::NUMERIC:
+                                    pn_data_put_ulong(echo, descriptor.value.code);
+                                    break;
+                                  case qpid::amqp::Descriptor::SYMBOLIC:
+                                    symbol.start = const_cast<char*>(descriptor.value.symbol.data);
+                                    symbol.size = descriptor.value.symbol.size;
+                                    pn_data_put_symbol(echo, symbol);
+                                    break;
+                                }
+                            }
+                            pn_data_put_string(echo, value);
+                            if (isDescribed) pn_data_exit(echo);
+
+                            QPID_LOG(debug, "Binding using filter " << std::string(fname.start, fname.size) << ":" << std::string(value.start, value.size));
+                        } else {
+                            //TODO: handle headers exchange filters
+                            QPID_LOG(warning, "Ignoring unsupported filter type with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter));
+                        }
+                    }
+                    pn_data_exit(echo);
+                } else {
+                    QPID_LOG(warning, "Filter should be map, got type: " << pn_data_type(filter));
+                }
+            } else if (exchange->getType() == FanOutExchange::typeName) {
                 exchange->bind(queue, std::string(), 0);
             } else if (exchange->getType() == TopicExchange::typeName) {
                 exchange->bind(queue, "#", 0);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Nov 15 14:42:22 2012
@@ -262,10 +262,9 @@ void ConnectionContext::attach(boost::sh
 
 void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
 {
-    pn_terminus_t* source = pn_link_source((pn_link_t*) lnk->receiver);
-    pn_terminus_set_address(source, lnk->getSource().c_str());
-    attach(ssn->session, (pn_link_t*) lnk->receiver, lnk->capacity);
-    if (!pn_link_remote_source((pn_link_t*) lnk->receiver)) {
+    lnk->configure();
+    attach(ssn->session, lnk->receiver, lnk->capacity);
+    if (!pn_link_remote_source(lnk->receiver)) {
         std::string msg("No such source : ");
         msg += lnk->getSource();
         throw qpid::messaging::NotFound(msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Thu Nov 15 14:42:22 2012
@@ -29,10 +29,10 @@ namespace qpid {
 namespace messaging {
 namespace amqp {
 //TODO: proper conversion to wide string for address
-ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const std::string& s)
+ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
   : name(n),
-    source(s),
-    receiver(pn_receiver(session, source.c_str())),
+    address(a),
+    receiver(pn_receiver(session, name.c_str())),
     capacity(0) {}
 ReceiverContext::~ReceiverContext()
 {
@@ -84,7 +84,36 @@ const std::string& ReceiverContext::getN
 
 const std::string& ReceiverContext::getSource() const
 {
-    return source;
+    return address.getName();
+}
+namespace {
+pn_bytes_t convert(const std::string& s)
+{
+    pn_bytes_t result;
+    result.start = const_cast<char*>(s.data());
+    result.size = s.size();
+    return result;
+}
+}
+
+void ReceiverContext::configure() const
+{
+    configure(pn_link_source(receiver));
+}
+void ReceiverContext::configure(pn_terminus_t* source) const
+{
+    pn_terminus_set_address(source, address.getName().c_str());
+
+    pn_data_t* filter = pn_terminus_filter(source);
+    pn_data_put_map(filter);
+    pn_data_enter(filter);
+    pn_data_put_symbol(filter, convert("subject"));
+    pn_data_put_described(filter);
+    pn_data_enter(filter);
+    pn_data_put_ulong(filter, 0x0000468C00000001/*LEGACY_TOPIC_FILTER*/);
+    pn_data_put_string(filter, convert(address.getSubject()));
+    pn_data_exit(filter);
+    pn_data_exit(filter);
 }
 
 bool ReceiverContext::isClosed() const

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Thu Nov 15 14:42:22 2012
@@ -21,11 +21,13 @@
  * under the License.
  *
  */
+#include "qpid/messaging/Address.h"
 #include <string>
 #include "qpid/sys/IntegerTypes.h"
 
 struct pn_link_t;
 struct pn_session_t;
+struct pn_terminus_t;
 
 namespace qpid {
 namespace messaging {
@@ -41,22 +43,25 @@ namespace amqp {
 class ReceiverContext
 {
   public:
-    ReceiverContext(pn_session_t* session, const std::string& name, const std::string& source);
+    ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source);
     ~ReceiverContext();
     void setCapacity(uint32_t);
     uint32_t getCapacity();
     uint32_t getAvailable();
     uint32_t getUnsettled();
+    void attach();
     void close();
     const std::string& getName() const;
     const std::string& getSource() const;
     bool isClosed() const;
+    void configure() const;
   private:
     friend class ConnectionContext;
     const std::string name;
-    const std::string source;
+    const Address address;
     pn_link_t* receiver;
     uint32_t capacity;
+    void configure(pn_terminus_t*) const;
 };
 }}} // namespace qpid::messaging::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Thu Nov 15 14:42:22 2012
@@ -36,10 +36,10 @@ namespace qpid {
 namespace messaging {
 namespace amqp {
 //TODO: proper conversion to wide string for address
-SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t)
+SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
   : name(n),
-    target(t),
-    sender(pn_sender(session, target.c_str())), capacity(1000) {}
+    address(a),
+    sender(pn_sender(session, n.c_str())), capacity(1000) {}
 
 SenderContext::~SenderContext()
 {
@@ -74,7 +74,7 @@ const std::string& SenderContext::getNam
 
 const std::string& SenderContext::getTarget() const
 {
-    return target;
+    return address.getName();
 }
 
 SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
@@ -82,7 +82,7 @@ SenderContext::Delivery* SenderContext::
     if (processUnsettled() < capacity) {
         deliveries.push_back(Delivery(nextId++));
         Delivery& delivery = deliveries.back();
-        delivery.encode(MessageImplAccess::get(message));
+        delivery.encode(MessageImplAccess::get(message), address);
         delivery.send(sender);
         return &delivery;
     } else {
@@ -135,7 +135,7 @@ const std::string EMPTY;
 class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
 {
   public:
-    PropertiesAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {}
+    PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {}
     bool hasMessageId() const
     {
         return getMessageId().size();
@@ -167,12 +167,12 @@ class PropertiesAdapter : public qpid::a
 
     bool hasSubject() const
     {
-        return getSubject().size();
+        return subject.size() || getSubject().size();
     }
 
     std::string getSubject() const
     {
-        return msg.getSubject();
+        return subject.size() ? subject : msg.getSubject();
     }
 
     bool hasReplyTo() const
@@ -266,16 +266,23 @@ class PropertiesAdapter : public qpid::a
     }
   private:
     const qpid::messaging::MessageImpl& msg;
+    const std::string subject;
 };
+
+bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
+{
+    return address.getSubject().size() && address.getSubject() != msg.getSubject();
+}
+
 }
 
 SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {}
 
-void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg)
+void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
 {
     boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
 
-    if (original) { //still have the content as received, send at least the bare message unaltered
+    if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
         //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
         if (original->hasHeaderChanged(msg)) {
             //since as yet have no annotations, just write the revised header then the rest of the message as received
@@ -293,7 +300,7 @@ void SenderContext::Delivery::encode(con
         }
     } else {
         HeaderAdapter header(msg);
-        PropertiesAdapter properties(msg);
+        PropertiesAdapter properties(msg, address.getSubject());
         //compute size:
         encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes()));
         QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Thu Nov 15 14:42:22 2012
@@ -25,6 +25,7 @@
 #include <string>
 #include <vector>
 #include "qpid/sys/IntegerTypes.h"
+#include "qpid/messaging/Address.h"
 #include "qpid/messaging/amqp/EncodedMessage.h"
 
 struct pn_delivery_t;
@@ -48,7 +49,7 @@ class SenderContext
     {
       public:
         Delivery(int32_t id);
-        void encode(const qpid::messaging::MessageImpl& message);
+        void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
         void send(pn_link_t*);
         bool accepted();
       private:
@@ -57,7 +58,7 @@ class SenderContext
         EncodedMessage encoded;
     };
 
-    SenderContext(pn_session_t* session, const std::string& name, const std::string& target);
+    SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target);
     ~SenderContext();
     void close();
     void setCapacity(uint32_t);
@@ -71,7 +72,7 @@ class SenderContext
     typedef std::deque<Delivery> Deliveries;
 
     const std::string name;
-    const std::string target;
+    const qpid::messaging::Address address;
     pn_link_t* sender;
     int32_t nextId;
     Deliveries deliveries;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Thu Nov 15 14:42:22 2012
@@ -49,7 +49,7 @@ boost::shared_ptr<SenderContext> Session
     for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) {
         name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
     }
-    boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address.str()));
+    boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address));
     senders[name] = s;
     return s;
 }
@@ -62,7 +62,7 @@ boost::shared_ptr<ReceiverContext> Sessi
     for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) {
         name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
     }
-    boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address.str()));
+    boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
     receivers[name] = r;
     return r;
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org