You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/11/20 00:07:37 UTC

svn commit: r882349 - /qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp

Author: gsim
Date: Thu Nov 19 23:07:37 2009
New Revision: 882349

URL: http://svn.apache.org/viewvc?rev=882349&view=rev
Log:
QPID-664: Refactored address resolution; ensure type is asserted on when required; moved exclusive option for subscribe.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=882349&r1=882348&r2=882349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Thu Nov 19 23:07:37 2009
@@ -60,6 +60,7 @@
 
 
 namespace{
+const Variant EMPTY_VARIANT;
 const FieldTable EMPTY_FIELD_TABLE;
 const std::string EMPTY_STRING;
 
@@ -160,6 +161,7 @@
 
   private:
     std::string type;
+    bool typeSpecified;
     bool durable;
     bool autoDelete;
     std::string alternateExchange;
@@ -269,50 +271,42 @@
     return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
 }
 
-std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
-                                                              const Address& address)
+std::string checkAddressType(qpid::client::Session session, const Address& address)
 {
-    ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
-    if (result.getQueueNotFound() && result.getExchangeNotFound()) {
-        //neither a queue nor an exchange exists with that name
-        if (address.getType() == TOPIC_ADDRESS) {
-            std::auto_ptr<MessageSource> source(new Subscription(address));
-            QPID_LOG(debug, "treating source address as topic: " << address);
-            return source;
-        } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
-            std::auto_ptr<MessageSource> source(new QueueSource(address));
-            QPID_LOG(debug, "treating source address as queue: " << address);
-            return source;
+    std::string type = address.getType();
+    if (type.empty()) {
+        ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
+        if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+            //neither a queue nor an exchange exists with that name; treat it as a queue
+            type = QUEUE_ADDRESS;
+        } else if (result.getExchangeNotFound()) {
+            //name refers to a queue
+            type = QUEUE_ADDRESS;
+        } else if (result.getQueueNotFound()) {
+            //name refers to an exchange
+            type = TOPIC_ADDRESS;
         } else {
-            throw InvalidAddress("Unrecognised type: " + address.getType());
+            //both a queue and exchange exist for that name
+            throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
         }
-    } else if (result.getQueueNotFound()) {
-        //only an exchange exists with that name
-        qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
-        std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
-        QPID_LOG(debug, "resolved source address as topic: " << address);
+    }
+    return type;
+}
+
+std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
+                                                              const Address& address)
+{
+    std::string type = checkAddressType(session, address);
+    if (type == TOPIC_ADDRESS) {
+        std::auto_ptr<MessageSource> source(new Subscription(address));
+        QPID_LOG(debug, "treating source address as topic: " << address);
         return source;
-    } else if (result.getExchangeNotFound()) {
-        //only an queue exists with that name
+    } else if (type == QUEUE_ADDRESS) {
         std::auto_ptr<MessageSource> source(new QueueSource(address));
-        QPID_LOG(debug, "resolved source address as queue: " << address);
+        QPID_LOG(debug, "treating source address as queue: " << address);
         return source;
     } else {
-        //both a queue and exchange exist for that name
-        if (address.getType() == TOPIC_ADDRESS) {
-            qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
-            std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
-            QPID_LOG(debug, "resolved source address as topic: " << address);
-            return source;
-        } else if (address.getType() == QUEUE_ADDRESS) {
-            std::auto_ptr<MessageSource> source(new QueueSource(address));
-            QPID_LOG(debug, "resolved source address as queue: " << address);
-            return source;
-        } else if (address.getType().empty()) {
-            throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
-        } else {
-            throw InvalidAddress("Unrecognised type: " + address.getType());
-        }
+        throw InvalidAddress("Unrecognised type: " + type);
     }
 }
 
@@ -320,45 +314,29 @@
 std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
                                                           const qpid::messaging::Address& address)
 {
-    ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
-    if (result.getQueueNotFound() && result.getExchangeNotFound()) {
-        //neither a queue nor an exchange exists with that name
-        if (address.getType() == TOPIC_ADDRESS) {
-            std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
-            QPID_LOG(debug, "treating target address as topic: " << address);
-            return sink;
-        } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
-            std::auto_ptr<MessageSink> sink(new QueueSink(address));
-            QPID_LOG(debug, "treating target address as queue: " << address);
-            return sink;
-        } else {
-            throw InvalidAddress("Unrecognised type: " + address.getType());
-        }
-    } else if (result.getQueueNotFound()) {
-        //only an exchange exists with that name
+    std::string type = checkAddressType(session, address);
+    if (type == TOPIC_ADDRESS) {
         std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
-        QPID_LOG(debug, "resolved target address as topic: " << address);
+        QPID_LOG(debug, "treating target address as topic: " << address);
         return sink;
-    } else if (result.getExchangeNotFound()) {
-        //only an queue exists with that name
+    } else if (type == QUEUE_ADDRESS) {
         std::auto_ptr<MessageSink> sink(new QueueSink(address));
-        QPID_LOG(debug, "resolved target address as queue: " << address);
+        QPID_LOG(debug, "treating target address as queue: " << address);
         return sink;
     } else {
-        //both a queue and exchange exist for that name
-        if (address.getType() == TOPIC_ADDRESS) {
-            std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
-            QPID_LOG(debug, "resolved target address as topic: " << address);
-            return sink;
-        } else if (address.getType() == QUEUE_ADDRESS) {
-            std::auto_ptr<MessageSink> sink(new QueueSink(address));
-            QPID_LOG(debug, "resolved target address as queue: " << address);
-            return sink;
-        } else if (address.getType().empty()) {
-            throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
-        } else {
-            throw InvalidAddress("Unrecognised type: " + address.getType());
-        }
+        throw InvalidAddress("Unrecognised type: " + type);
+    }
+}
+
+const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0)
+{
+    Variant::Map::const_iterator i = options.find(keys[index]);
+    if (i == options.end()) {
+        return EMPTY_VARIANT;
+    } else if (index+1 < keys.size()) {
+        return getNestedOption(i->second.asMap(), keys, index+1);
+    } else {
+        return i->second;
     }
 }
 
@@ -366,7 +344,7 @@
     Queue(address),
     acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
     acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
-    exclusive(address.getOption(EXCLUSIVE).asBool())
+    exclusive(getNestedOption(address.getOptions(), list_of<std::string>(X_PROPERTIES)(EXCLUSIVE)).asBool())
 {
     //extract subscription arguments from address options
     convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options);
@@ -713,6 +691,7 @@
 
 Exchange::Exchange(const Address& a) : Node(a),
                                        type(TOPIC_EXCHANGE),
+                                       typeSpecified(false),
                                        durable(false),
                                        autoDelete(false)
 {
@@ -735,8 +714,10 @@
     } else {
         try {
             sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
-        } catch (const qpid::Exception& e) {
-            throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str());
+        } catch (const qpid::framing::NotFoundException& e) {
+            throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str());
+        } catch (const std::exception& e) {
+            throw InvalidAddress(e.what());
         }
     }
 }
@@ -756,11 +737,11 @@
 void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
 {
     if (enabled(assertPolicy, mode)) {
-        ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name);
+        ExchangeQueryResult result = sync(session).exchangeQuery(name);
         if (result.getNotFound()) {
             throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
         } else {
-            if (!type.empty() && result.getType() != type) {
+            if (typeSpecified && result.getType() != type) {
                 throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") 
                                       % name % type % result.getType()).str());
             }
@@ -795,7 +776,7 @@
             Variant::Map passthrough;
             for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
                 if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
-                else if (i->first == xamqp::EXCHANGE_TYPE) type = i->second.asString();
+                else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; }
                 else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
                 else passthrough[i->first] = i->second;
             }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org