You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/09/20 17:43:34 UTC
svn commit: r1525040 - in /qpid/trunk/qpid/cpp/src/qpid:
broker/amqp/Session.cpp messaging/amqp/AddressHelper.cpp
messaging/amqp/ConnectionContext.cpp
Author: gsim
Date: Fri Sep 20 15:43:34 2013
New Revision: 1525040
URL: http://svn.apache.org/r1525040
Log:
QPID-5146: fix handling of capabilities
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1525040&r1=1525039&r2=1525040&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Sep 20 15:43:34 2013
@@ -57,15 +57,16 @@ namespace broker {
namespace amqp {
namespace {
-bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+pn_bytes_t convert(const std::string& s)
{
- pn_data_rewind(capabilities);
- while (pn_data_next(capabilities)) {
- pn_bytes_t c = pn_data_get_symbol(capabilities);
- std::string s(c.start, c.size);
- if (s == name) return true;
- }
- return false;
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+std::string convert(pn_bytes_t in)
+{
+ return std::string(in.start, in.size);
}
//capabilities
const std::string CREATE_ON_DEMAND("create-on-demand");
@@ -76,40 +77,90 @@ const std::string DIRECT_FILTER("legacy-
const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
const std::string SHARED("shared");
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+void writeCapabilities(pn_data_t* out, const std::vector<std::string>& supported)
{
- pn_data_rewind(in);
- while (pn_data_next(in)) {
- pn_bytes_t c = pn_data_get_symbol(in);
- std::string s(c.start, c.size);
- if (s == DURABLE) {
- if (node->isDurable()) pn_data_put_symbol(out, c);
- } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
- pn_data_put_symbol(out, c);
+ if (supported.size() == 1) {
+ pn_data_put_symbol(out, convert(supported.front()));
+ } else if (supported.size() > 1) {
+ pn_data_put_array(out, false, PN_SYMBOL);
+ pn_data_enter(out);
+ for (std::vector<std::string>::const_iterator i = supported.begin(); i != supported.end(); ++i) {
+ pn_data_put_symbol(out, convert(*i));
+ }
+ pn_data_exit(out);
+ }
+}
+
+template <class F>
+void readCapabilities(pn_data_t* data, F f)
+{
+ pn_data_rewind(data);
+ if (pn_data_next(data)) {
+ pn_type_t type = pn_data_type(data);
+ if (type == PN_ARRAY) {
+ pn_data_enter(data);
+ while (pn_data_next(data)) {
+ f(convert(pn_data_get_symbol(data)));
+ }
+ pn_data_exit(data);
+ } else if (type == PN_SYMBOL) {
+ f(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
}
}
}
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+void matchCapability(const std::string& name, bool* result, const std::string& s)
{
- pn_data_rewind(in);
- while (pn_data_next(in)) {
- pn_bytes_t c = pn_data_get_symbol(in);
- std::string s(c.start, c.size);
- if (s == DURABLE) {
- if (node->isDurable()) pn_data_put_symbol(out, c);
- } else if (s == SHARED) {
- pn_data_put_symbol(out, c);
- } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
- pn_data_put_symbol(out, c);
- } else if (s == DIRECT_FILTER) {
- if (node->getType() == DirectExchange::typeName) pn_data_put_symbol(out, c);
- } else if (s == TOPIC_FILTER) {
- if (node->getType() == TopicExchange::typeName) pn_data_put_symbol(out, c);
- }
+ if (s == name) *result = true;
+}
+
+bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+{
+ bool result(false);
+ readCapabilities(capabilities, boost::bind(&matchCapability, name, &result, _1));
+ return result;
+}
+
+void collectQueueCapabilities(boost::shared_ptr<Queue> node, std::vector<std::string>* supported, const std::string& s)
+{
+ if (s == DURABLE) {
+ if (node->isDurable()) supported->push_back(s);
+ } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
+ supported->push_back(s);
+ }
+}
+
+void collectExchangeCapabilities(boost::shared_ptr<Exchange> node, std::vector<std::string>* supported, const std::string& s)
+{
+ if (s == DURABLE) {
+ if (node->isDurable()) supported->push_back(s);
+ } else if (s == SHARED) {
+ supported->push_back(s);
+ } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
+ supported->push_back(s);
+ } else if (s == DIRECT_FILTER) {
+ if (node->getType() == DirectExchange::typeName) supported->push_back(s);
+ } else if (s == TOPIC_FILTER) {
+ if (node->getType() == TopicExchange::typeName) supported->push_back(s);
}
}
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+{
+ std::vector<std::string> supported;
+ readCapabilities(in, boost::bind(&collectQueueCapabilities, node, &supported, _1));
+ writeCapabilities(out, supported);
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+{
+ std::vector<std::string> supported;
+ readCapabilities(in, boost::bind(&collectExchangeCapabilities, node, &supported, _1));
+ writeCapabilities(out, supported);
+}
+
}
class IncomingToQueue : public DecodingIncoming
@@ -150,6 +201,12 @@ Session::ResolvedNode Session::resolve(c
node.queue = connection.getBroker().getQueues().find(name);
node.topic = connection.getTopics().get(name);
if (node.topic) node.exchange = node.topic->getExchange();
+ if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
+ node.properties.read(pn_terminus_properties(terminus));
+ if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
+ throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
+ }
+ }
if (!node.queue && !node.exchange) {
if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
//is it a queue or an exchange?
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1525040&r1=1525039&r2=1525040&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Fri Sep 20 15:43:34 2013
@@ -426,16 +426,23 @@ void AddressHelper::checkAssertion(pn_te
QPID_LOG(debug, "checking assertions: " << capabilities);
//ensure all desired capabilities have been offered
std::set<std::string> desired;
- if (type.size()) desired.insert(type);
- if (durableNode) desired.insert(DURABLE);
for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
- desired.insert(i->asString());
+ if (*i != CREATE_ON_DEMAND) desired.insert(i->asString());
}
pn_data_t* data = pn_terminus_capabilities(terminus);
- while (pn_data_next(data)) {
- pn_bytes_t c = pn_data_get_symbol(data);
- std::string s(c.start, c.size);
- desired.erase(s);
+ if (pn_data_next(data)) {
+ pn_type_t type = pn_data_type(data);
+ if (type == PN_ARRAY) {
+ pn_data_enter(data);
+ while (pn_data_next(data)) {
+ desired.erase(convert(pn_data_get_symbol(data)));
+ }
+ pn_data_exit(data);
+ } else if (type == PN_SYMBOL) {
+ desired.erase(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
+ }
}
if (desired.size()) {
@@ -614,12 +621,20 @@ void AddressHelper::configure(pn_link_t*
void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
{
+ if (create) capabilities.push_back(CREATE_ON_DEMAND);
+ if (!type.empty()) capabilities.push_back(type);
+ if (durableNode) capabilities.push_back(DURABLE);
+
pn_data_t* data = pn_terminus_capabilities(terminus);
- if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND));
- if (type.size()) pn_data_put_symbol(data, convert(type));
- if (durableNode) pn_data_put_symbol(data, convert(DURABLE));
- for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
- pn_data_put_symbol(data, convert(i->asString()));
+ if (capabilities.size() == 1) {
+ pn_data_put_symbol(data, convert(capabilities.front().asString()));
+ } else if (capabilities.size() > 1) {
+ pn_data_put_array(data, false, PN_SYMBOL);
+ pn_data_enter(data);
+ for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+ pn_data_put_symbol(data, convert(i->asString()));
+ }
+ pn_data_exit(data);
}
}
std::string AddressHelper::getLinkName(const Address& address)
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1525040&r1=1525039&r2=1525040&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Sep 20 15:43:34 2013
@@ -265,8 +265,8 @@ void ConnectionContext::attach(boost::sh
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
lnk->configure();
attach(lnk->sender);
- lnk->verify();
checkClosed(ssn, lnk);
+ lnk->verify();
QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
@@ -275,8 +275,8 @@ void ConnectionContext::attach(boost::sh
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
lnk->configure();
attach(lnk->receiver, lnk->capacity);
- lnk->verify();
checkClosed(ssn, lnk);
+ lnk->verify();
QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org