You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/09/20 17:43:34 UTC

svn commit: r1525040 - in /qpid/trunk/qpid/cpp/src/qpid: broker/amqp/Session.cpp messaging/amqp/AddressHelper.cpp messaging/amqp/ConnectionContext.cpp

Author: gsim
Date: Fri Sep 20 15:43:34 2013
New Revision: 1525040

URL: http://svn.apache.org/r1525040
Log:
QPID-5146: fix handling of capabilities

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1525040&r1=1525039&r2=1525040&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Sep 20 15:43:34 2013
@@ -57,15 +57,16 @@ namespace broker {
 namespace amqp {
 
 namespace {
-bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+pn_bytes_t convert(const std::string& s)
 {
-    pn_data_rewind(capabilities);
-    while (pn_data_next(capabilities)) {
-        pn_bytes_t c = pn_data_get_symbol(capabilities);
-        std::string s(c.start, c.size);
-        if (s == name) return true;
-    }
-    return false;
+    pn_bytes_t result;
+    result.start = const_cast<char*>(s.data());
+    result.size = s.size();
+    return result;
+}
+std::string convert(pn_bytes_t in)
+{
+    return std::string(in.start, in.size);
 }
 //capabilities
 const std::string CREATE_ON_DEMAND("create-on-demand");
@@ -76,40 +77,90 @@ const std::string DIRECT_FILTER("legacy-
 const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
 const std::string SHARED("shared");
 
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+void writeCapabilities(pn_data_t* out, const std::vector<std::string>& supported)
 {
-    pn_data_rewind(in);
-    while (pn_data_next(in)) {
-        pn_bytes_t c = pn_data_get_symbol(in);
-        std::string s(c.start, c.size);
-        if (s == DURABLE) {
-            if (node->isDurable()) pn_data_put_symbol(out, c);
-        } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
-            pn_data_put_symbol(out, c);
+    if (supported.size() == 1) {
+        pn_data_put_symbol(out, convert(supported.front()));
+    } else if (supported.size() > 1) {
+        pn_data_put_array(out, false, PN_SYMBOL);
+        pn_data_enter(out);
+        for (std::vector<std::string>::const_iterator i = supported.begin(); i != supported.end(); ++i) {
+            pn_data_put_symbol(out, convert(*i));
+        }
+        pn_data_exit(out);
+    }
+}
+
+template <class F>
+void readCapabilities(pn_data_t* data, F f)
+{
+    pn_data_rewind(data);
+    if (pn_data_next(data)) {
+        pn_type_t type = pn_data_type(data);
+        if (type == PN_ARRAY) {
+            pn_data_enter(data);
+            while (pn_data_next(data)) {
+                f(convert(pn_data_get_symbol(data)));
+            }
+            pn_data_exit(data);
+        } else if (type == PN_SYMBOL) {
+            f(convert(pn_data_get_symbol(data)));
+        } else {
+            QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
         }
     }
 }
 
-void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+void matchCapability(const std::string& name, bool* result, const std::string& s)
 {
-    pn_data_rewind(in);
-    while (pn_data_next(in)) {
-        pn_bytes_t c = pn_data_get_symbol(in);
-        std::string s(c.start, c.size);
-        if (s == DURABLE) {
-            if (node->isDurable()) pn_data_put_symbol(out, c);
-        } else if (s == SHARED) {
-            pn_data_put_symbol(out, c);
-        } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
-            pn_data_put_symbol(out, c);
-        } else if (s == DIRECT_FILTER) {
-            if (node->getType() == DirectExchange::typeName) pn_data_put_symbol(out, c);
-        } else if (s == TOPIC_FILTER) {
-            if (node->getType() == TopicExchange::typeName) pn_data_put_symbol(out, c);
-        }
+    if (s == name) *result = true;
+}
+
+bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+{
+    bool result(false);
+    readCapabilities(capabilities, boost::bind(&matchCapability, name, &result, _1));
+    return result;
+}
+
+void collectQueueCapabilities(boost::shared_ptr<Queue> node, std::vector<std::string>* supported, const std::string& s)
+{
+    if (s == DURABLE) {
+        if (node->isDurable()) supported->push_back(s);
+    } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
+        supported->push_back(s);
+    }
+}
+
+void collectExchangeCapabilities(boost::shared_ptr<Exchange> node, std::vector<std::string>* supported, const std::string& s)
+{
+    if (s == DURABLE) {
+        if (node->isDurable()) supported->push_back(s);
+    } else if (s == SHARED) {
+        supported->push_back(s);
+    } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
+        supported->push_back(s);
+    } else if (s == DIRECT_FILTER) {
+        if (node->getType() == DirectExchange::typeName) supported->push_back(s);
+    } else if (s == TOPIC_FILTER) {
+        if (node->getType() == TopicExchange::typeName) supported->push_back(s);
     }
 }
 
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+{
+    std::vector<std::string> supported;
+    readCapabilities(in, boost::bind(&collectQueueCapabilities, node, &supported, _1));
+    writeCapabilities(out, supported);
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+{
+    std::vector<std::string> supported;
+    readCapabilities(in, boost::bind(&collectExchangeCapabilities, node, &supported, _1));
+    writeCapabilities(out, supported);
+}
+
 }
 
 class IncomingToQueue : public DecodingIncoming
@@ -150,6 +201,12 @@ Session::ResolvedNode Session::resolve(c
     node.queue = connection.getBroker().getQueues().find(name);
     node.topic = connection.getTopics().get(name);
     if (node.topic) node.exchange = node.topic->getExchange();
+    if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
+        node.properties.read(pn_terminus_properties(terminus));
+        if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
+            throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
+        }
+    }
     if (!node.queue && !node.exchange) {
         if (pn_terminus_is_dynamic(terminus)  || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
             //is it a queue or an exchange?

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1525040&r1=1525039&r2=1525040&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Fri Sep 20 15:43:34 2013
@@ -426,16 +426,23 @@ void AddressHelper::checkAssertion(pn_te
         QPID_LOG(debug, "checking assertions: " << capabilities);
         //ensure all desired capabilities have been offered
         std::set<std::string> desired;
-        if (type.size()) desired.insert(type);
-        if (durableNode) desired.insert(DURABLE);
         for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
-            desired.insert(i->asString());
+            if (*i != CREATE_ON_DEMAND) desired.insert(i->asString());
         }
         pn_data_t* data = pn_terminus_capabilities(terminus);
-        while (pn_data_next(data)) {
-            pn_bytes_t c = pn_data_get_symbol(data);
-            std::string s(c.start, c.size);
-            desired.erase(s);
+        if (pn_data_next(data)) {
+            pn_type_t type = pn_data_type(data);
+            if (type == PN_ARRAY) {
+                pn_data_enter(data);
+                while (pn_data_next(data)) {
+                    desired.erase(convert(pn_data_get_symbol(data)));
+                }
+                pn_data_exit(data);
+            } else if (type == PN_SYMBOL) {
+                desired.erase(convert(pn_data_get_symbol(data)));
+            } else {
+                QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
+            }
         }
 
         if (desired.size()) {
@@ -614,12 +621,20 @@ void AddressHelper::configure(pn_link_t*
 
 void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
 {
+    if (create) capabilities.push_back(CREATE_ON_DEMAND);
+    if (!type.empty()) capabilities.push_back(type);
+    if (durableNode) capabilities.push_back(DURABLE);
+
     pn_data_t* data = pn_terminus_capabilities(terminus);
-    if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND));
-    if (type.size()) pn_data_put_symbol(data, convert(type));
-    if (durableNode) pn_data_put_symbol(data, convert(DURABLE));
-    for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
-        pn_data_put_symbol(data, convert(i->asString()));
+    if (capabilities.size() == 1) {
+        pn_data_put_symbol(data, convert(capabilities.front().asString()));
+    } else if (capabilities.size() > 1) {
+        pn_data_put_array(data, false, PN_SYMBOL);
+        pn_data_enter(data);
+        for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+            pn_data_put_symbol(data, convert(i->asString()));
+        }
+        pn_data_exit(data);
     }
 }
 std::string AddressHelper::getLinkName(const Address& address)

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1525040&r1=1525039&r2=1525040&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Sep 20 15:43:34 2013
@@ -265,8 +265,8 @@ void ConnectionContext::attach(boost::sh
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     lnk->configure();
     attach(lnk->sender);
-    lnk->verify();
     checkClosed(ssn, lnk);
+    lnk->verify();
     QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
 }
 
@@ -275,8 +275,8 @@ void ConnectionContext::attach(boost::sh
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     lnk->configure();
     attach(lnk->receiver, lnk->capacity);
-    lnk->verify();
     checkClosed(ssn, lnk);
+    lnk->verify();
     QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org