You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/06/12 19:56:20 UTC

svn commit: r1492311 - in /qpid/trunk/qpid/cpp/src/qpid/broker/amqp: Outgoing.cpp Outgoing.h Session.cpp

Author: gsim
Date: Wed Jun 12 17:56:20 2013
New Revision: 1492311

URL: http://svn.apache.org/r1492311
Log:
QPID-4917: allow shared topic subscriptions

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1492311&r1=1492310&r2=1492311&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Jun 12 17:56:20 2013
@@ -41,10 +41,10 @@ void Outgoing::wakeup()
     session.wakeup();
 }
 
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e)
     : Outgoing(broker, session, source, target, pn_link_name(l)),
       Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
-      exclusive(topic),
+      exclusive(e),
       queue(q), deliveries(5000), link(l), out(o),
       current(0), outstanding(0),
       buffer(1024)/*used only for header at present*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1492311&r1=1492310&r2=1492311&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Wed Jun 12 17:56:20 2013
@@ -88,7 +88,7 @@ class Outgoing : public ManagedOutgoingL
 class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
 {
   public:
-    OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic);
+    OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive);
     void setSubjectFilter(const std::string&);
     void setSelectorFilter(const std::string&);
     void init();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1492311&r1=1492310&r2=1492311&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Jun 12 17:56:20 2013
@@ -55,6 +55,7 @@ namespace amqp {
 namespace {
 bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
 {
+    pn_data_rewind(capabilities);
     while (pn_data_next(capabilities)) {
         pn_bytes_t c = pn_data_get_symbol(capabilities);
         std::string s(c.start, c.size);
@@ -69,9 +70,11 @@ const std::string QUEUE("queue");
 const std::string TOPIC("topic");
 const std::string DIRECT_FILTER("legacy-amqp-direct-binding");
 const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
+const std::string SHARED("shared");
 
 void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
 {
+    pn_data_rewind(in);
     while (pn_data_next(in)) {
         pn_bytes_t c = pn_data_get_symbol(in);
         std::string s(c.start, c.size);
@@ -85,11 +88,14 @@ void setCapabilities(pn_data_t* in, pn_d
 
 void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
 {
+    pn_data_rewind(in);
     while (pn_data_next(in)) {
         pn_bytes_t c = pn_data_get_symbol(in);
         std::string s(c.start, c.size);
         if (s == DURABLE) {
             if (node->isDurable()) pn_data_put_symbol(out, c);
+        } else if (s == SHARED) {
+            pn_data_put_symbol(out, c);
         } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
             pn_data_put_symbol(out, c);
         } else if (s == DIRECT_FILTER) {
@@ -281,18 +287,27 @@ void Session::setupOutgoing(pn_link_t* l
         filter.apply(q);
         outgoing[link] = q;
     } else if (node.exchange) {
+        bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
         bool durable = pn_terminus_get_durability(source);
         QueueSettings settings(durable, !durable);
         filter.configure(settings);
         //TODO: populate settings from source details when available from engine
-        std::stringstream queueName;//combination of container id and link name is unique
-        queueName << connection.getContainerId() << "_" << pn_link_name(link);
+        std::stringstream queueName;
+        if (shared) {
+            //just use link name (TODO: could allow this to be
+            //overridden when acces to link properties is provided
+            //(PROTON-335))
+            queueName << pn_link_name(link);
+        } else {
+            //combination of container id and link name is unique
+            queueName << connection.getContainerId() << "_" << pn_link_name(link);
+        }
         boost::shared_ptr<qpid::broker::Queue> queue
             = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first;
-        queue->setExclusiveOwner(this);
+        if (!shared) queue->setExclusiveOwner(this);
 
         filter.bind(node.exchange, queue);
-        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true));
+        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared));
         outgoing[link] = q;
         q->init();
     } else if (node.relay) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org