You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/06/12 19:56:20 UTC
svn commit: r1492311 - in /qpid/trunk/qpid/cpp/src/qpid/broker/amqp:
Outgoing.cpp Outgoing.h Session.cpp
Author: gsim
Date: Wed Jun 12 17:56:20 2013
New Revision: 1492311
URL: http://svn.apache.org/r1492311
Log:
QPID-4917: allow shared topic subscriptions
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1492311&r1=1492310&r2=1492311&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Jun 12 17:56:20 2013
@@ -41,10 +41,10 @@ void Outgoing::wakeup()
session.wakeup();
}
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e)
: Outgoing(broker, session, source, target, pn_link_name(l)),
Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
- exclusive(topic),
+ exclusive(e),
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
buffer(1024)/*used only for header at present*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1492311&r1=1492310&r2=1492311&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Wed Jun 12 17:56:20 2013
@@ -88,7 +88,7 @@ class Outgoing : public ManagedOutgoingL
class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
{
public:
- OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic);
+ OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive);
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1492311&r1=1492310&r2=1492311&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Jun 12 17:56:20 2013
@@ -55,6 +55,7 @@ namespace amqp {
namespace {
bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
{
+ pn_data_rewind(capabilities);
while (pn_data_next(capabilities)) {
pn_bytes_t c = pn_data_get_symbol(capabilities);
std::string s(c.start, c.size);
@@ -69,9 +70,11 @@ const std::string QUEUE("queue");
const std::string TOPIC("topic");
const std::string DIRECT_FILTER("legacy-amqp-direct-binding");
const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
+const std::string SHARED("shared");
void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
{
+ pn_data_rewind(in);
while (pn_data_next(in)) {
pn_bytes_t c = pn_data_get_symbol(in);
std::string s(c.start, c.size);
@@ -85,11 +88,14 @@ void setCapabilities(pn_data_t* in, pn_d
void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
{
+ pn_data_rewind(in);
while (pn_data_next(in)) {
pn_bytes_t c = pn_data_get_symbol(in);
std::string s(c.start, c.size);
if (s == DURABLE) {
if (node->isDurable()) pn_data_put_symbol(out, c);
+ } else if (s == SHARED) {
+ pn_data_put_symbol(out, c);
} else if (s == CREATE_ON_DEMAND || s == TOPIC) {
pn_data_put_symbol(out, c);
} else if (s == DIRECT_FILTER) {
@@ -281,18 +287,27 @@ void Session::setupOutgoing(pn_link_t* l
filter.apply(q);
outgoing[link] = q;
} else if (node.exchange) {
+ bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
QueueSettings settings(durable, !durable);
filter.configure(settings);
//TODO: populate settings from source details when available from engine
- std::stringstream queueName;//combination of container id and link name is unique
- queueName << connection.getContainerId() << "_" << pn_link_name(link);
+ std::stringstream queueName;
+ if (shared) {
+ //just use link name (TODO: could allow this to be
+ //overridden when acces to link properties is provided
+ //(PROTON-335))
+ queueName << pn_link_name(link);
+ } else {
+ //combination of container id and link name is unique
+ queueName << connection.getContainerId() << "_" << pn_link_name(link);
+ }
boost::shared_ptr<qpid::broker::Queue> queue
= broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first;
- queue->setExclusiveOwner(this);
+ if (!shared) queue->setExclusiveOwner(this);
filter.bind(node.exchange, queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared));
outgoing[link] = q;
q->init();
} else if (node.relay) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org