You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/03/24 19:35:12 UTC

svn commit: r927144 - in /qpid/trunk/qpid/cpp: examples/messaging/readme.txt include/qpid/messaging/Address.h src/qpid/client/amqp0_10/AddressResolution.cpp src/qpid/messaging/Address.cpp src/tests/MessagingSessionTests.cpp

Author: gsim
Date: Wed Mar 24 18:35:11 2010
New Revision: 927144

URL: http://svn.apache.org/viewvc?rev=927144&view=rev
Log:
QPID-664: Updates to address options to stay in-line with python client changes in r926604

Modified:
    qpid/trunk/qpid/cpp/examples/messaging/readme.txt
    qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/examples/messaging/readme.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/readme.txt?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/readme.txt (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/readme.txt Wed Mar 24 18:35:11 2010
@@ -96,7 +96,7 @@ of address (as there is no existing enti
 type and as we do not want the default type to be created, namely a
 queue):
 
-* run: ./drain -f --address 'my-new-topic; {create: always, node-properties:{type:topic}}'
+* run: ./drain -f --address 'my-new-topic; {create: always, node:{type:topic}}'
 * then run: ./spout --address my-new-queue
 
 The value to the create policy is one of always, sender, receiver or
@@ -128,19 +128,17 @@ qpid-config or even auto-create one):
 An example using xquery based filtering with the xml exchange:
 
 * First start a subscriber with an xquery filter specified:
-  ./drain -f --address 'xml/my-subject; {filter:{xquery:"declare variable $colour external; $colour = '\''red'\''"}}'
+  ./drain -f --address 'xml; {link:{x-bindings:[{arguments:{xquery:"declare variable $colour external; $colour = '\''red'\''"}}]}}'
 
 * Then test receipt of messages that match the xquery filter:
-  ./spout --address 'xml/my-subject' --property colour=red --content 'matched!'
+  ./spout --address 'xml' --property colour=red --content 'matched!'
     and
-  ./spout --address 'xml/my-subject' --property colour=blue --content 'not matched'
+  ./spout --address 'xml' --property colour=blue --content 'not matched'
 
 TODO:
 
 * auto-creating exchanges of different types
 
-* xml content in the xquery example
-
 * 'durable' and 'reliable' subscriptions
 
 * map content

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h Wed Mar 24 18:35:11 2010
@@ -65,82 +65,57 @@ class AddressImpl;
  *
  * <table border=0> 
  * 
- * <tr valign=top><td>create</td><td>Indicate whether the address should be
- * automatically created or not. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>. The properties of
- * the node to be created can be specified via the node-properties
- * option (see below).</td></tr>
- * 
- * <tr valign=top><td>assert</td><td>Indicate whether or not to assert any specified
- * node-properties match the address. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
- * 
- * <tr valign=top><td>delete</td><td>Indicate whether or not to delete the addressed
- * nide when a sender or receiver is cancelled. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
+ * <tr valign=top>
+ *   <td>create</td>
+ *   <td>Indicate whether the address should be automatically created
+ *       or not. Can be one of <i>always</i>, <i>never</i>,
+ *       <i>sender</i> or <i>receiver</i>. The properties of the node
+ *       to be created can be specified via the node options (see
+ *       below).
+ *   </td>
+ * </tr>
+ * 
+ * <tr valign=top>
+ *   <td>assert</td>
+ *   <td>Indicate whether or not to assert any specified node
+ *       properties(see below) match the address. Can be one of
+ *       <i>always</i>, <i>never</i>, <i>sender</i> or
+ *       <i>receiver</i>.
+ *   </td>
+ * </tr>
+ * 
+ * <tr valign=top>
+ *   <td>delete</td>
+ *   <td>Indicate whether or not to delete the addressed node when a
+ *       sender or receiver is cancelled. Can be one of <i>always</i>,
+ *       <i>never</i>, <i>sender</i> or <i>receiver</i>.
+ *   </td>
+ * </tr>
  *
- * <tr valign=top><td>reliability</td><td>indicates the level of
- * reliability expected. Can be one of unreliable, at-most-once,
- * at-least-once or exactly-once (the latter is not yet correctly
- * supported).</td></tr>
- * 
- * <tr valign=top><td>node-properties</td><td>A nested map of properties of the addressed
- * entity or 'node'. These can be used when automatically creating it,
- * or to assert certain properties.
- * 
- * The valid node-properties are:
- * <ul>
- * <li>type - queue or topic</li>
- * 
- * <li>durable - true or false</li>
- * 
- * <li>x-properties - a nested map that can contain implementation or
- * protocol specifiec extedned properties. For the amqp 0-10 mapping,
- * the fields in queue- or exchange- declare can be specified in here;
- * a bindings entry may also be specified, whose value should be an
- * array of strings of the form exchange/key; anything else will be
- * passed through in the arguments field.
- * </li> 
- * </ul> 
- * </td></tr>
- * 
- * </table>
+ * <tr valign=top>
+ *   <td>node</td>
+ *   <td>A nested map describing properties of the addressed
+ *       node. Current properties supported are type (topic or queue),
+ *       durable (boolean), x-declare and x-bindings.
+ *   </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ *   <td>link</td>
+ *   <td>A nested map through which properties of the 'link' from
+ *       sender/receiver to node can be configured. Current propeties
+ *       are name, durable, realiability, x-declare, x-subscribe and
+ *       x-bindings.
+ *   </td>
+ * </tr>
  * 
- * For receivers there are some further options of interest:
+ * For receivers there is one other option of interest:
  * 
  * <table border=0 valign=top>
- * 
- * <tr valign=top><td>no-local</td><td>(only relevant for topics at present) specifies that the
- * receiver does not want to receiver messages published to the topic
- * that originate from a sender on the same connection</td></tr>
- *
  * <tr valign=top><td>mode</td><td>(only relevant for queues)
  * indicates whether the subscribe should consume (the default) or
  * merely browse the messages. Valid values are 'consume' and
  * 'browse'</td></tr>
- * 
- * <tr valign=top><td>durable</td><td>(only relevant for topics at present) specifies that a
- * durable subscription is required</td></tr>
- * 
- * <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to
- * be created for the queue that match the given criteria (or list of
- * criteria).</td></tr>
- * 
- * <tr valign=top><td>x-properties</td><td>allows protocol or implementation specific options
- * to be specified for a receiver; this is a nested map and currently
- * the implementation only recognises two specific nested properties
- * within it (all others are passed through in the arguments of the
- * message-subscribe command):
- * 
- * <ul>
- *     <li>exclusive, which requests an exclusive subscription and
- *     is only relevant for queues</li>
- *
- *     <li>x-queue-arguments, which is only relevant for topics and
- *     allows arguments to the queue-declare for the subscription
- *     queue to be specified</li>
- * </ul>
- * </td></tr>
  * </table>
  */
 class Address

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Wed Mar 24 18:35:11 2010
@@ -61,40 +61,54 @@ using namespace boost::assign;
 namespace{
 const Variant EMPTY_VARIANT;
 const FieldTable EMPTY_FIELD_TABLE;
+ const Variant::List EMPTY_LIST;
 const std::string EMPTY_STRING;
 
-//option names
-const std::string BROWSE("browse");
-const std::string CONSUME("consume");
-const std::string EXCLUSIVE("exclusive");
-const std::string MODE("mode");
-const std::string NO_LOCAL("no-local");
-const std::string FILTER("filter");
-const std::string RELIABILITY("reliability");
-const std::string NAME("subscription-name");
-const std::string NODE_PROPERTIES("node-properties");
-const std::string X_PROPERTIES("x-properties");
-
 //policy types
 const std::string CREATE("create");
 const std::string ASSERT("assert");
 const std::string DELETE("delete");
+
+//option names
+const std::string NODE("node");
+const std::string LINK("link");
+const std::string MODE("mode");
+const std::string RELIABILITY("reliability");
+const std::string NAME("name");
+const std::string DURABLE("durable");
+const std::string X_DECLARE("x-declare");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string X_BINDINGS("x-bindings");
+const std::string EXCHANGE("exchange");
+const std::string QUEUE("queue");
+const std::string KEY("key");
+const std::string ARGUMENTS("arguments");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string TYPE("type");
+const std::string EXCLUSIVE("exclusive");
+const std::string AUTO_DELETE("auto-delete");
+
 //policy values
 const std::string ALWAYS("always");
 const std::string NEVER("never");
 const std::string RECEIVER("receiver");
 const std::string SENDER("sender");
 
+//address types
 const std::string QUEUE_ADDRESS("queue");
 const std::string TOPIC_ADDRESS("topic");
 
+//reliability options:
 const std::string UNRELIABLE("unreliable");
 const std::string AT_MOST_ONCE("at-most-once");
 const std::string AT_LEAST_ONCE("at-least-once");
 const std::string EXACTLY_ONCE("exactly-once");
-const std::string DURABLE_SUBSCRIPTION("durable");
-const std::string DURABLE("durable");
 
+//receiver modes:
+const std::string BROWSE("browse");
+const std::string CONSUME("consume");
+
+//0-10 exchange types:
 const std::string TOPIC_EXCHANGE("topic");
 const std::string FANOUT_EXCHANGE("fanout");
 const std::string DIRECT_EXCHANGE("direct");
@@ -103,16 +117,26 @@ const std::string XML_EXCHANGE("xml");
 const std::string WILDCARD_ANY("*");
 }
 
-//some amqp 0-10 specific options
-namespace xamqp{
-const std::string AUTO_DELETE("auto-delete");
-const std::string EXCHANGE_TYPE("type");
-const std::string EXCLUSIVE("exclusive");
-const std::string ALTERNATE_EXCHANGE("alternate-exchange");
-const std::string QUEUE_ARGUMENTS("x-queue-arguments");
-const std::string SUBSCRIBE_ARGUMENTS("x-subscribe-arguments");
-const std::string BINDINGS("bindings");
-}
+struct Binding
+{
+    Binding(const Variant::Map&);
+    Binding(const std::string& exchange, const std::string& queue, const std::string& key);
+    
+    std::string exchange;
+    std::string queue;
+    std::string key;
+    FieldTable arguments;
+};
+
+struct Bindings : std::vector<Binding>
+{
+    void add(const Variant::List& bindings);
+    void setDefaultExchange(const std::string&);
+    void setDefaultQueue(const std::string&);
+    void bind(qpid::client::AsyncSession& session);
+    void unbind(qpid::client::AsyncSession& session);
+    void check(qpid::client::AsyncSession& session);
+};
 
 class Node
 {
@@ -125,6 +149,8 @@ class Node
     Variant createPolicy;
     Variant assertPolicy;
     Variant deletePolicy;
+    Bindings nodeBindings;
+    Bindings linkBindings;
 
     static bool enabled(const Variant& policy, CheckMode mode);
     static bool createEnabled(const Address& address, CheckMode mode);
@@ -133,17 +159,6 @@ class Node
     static std::vector<std::string> SENDER_MODES;
 };
 
-struct Binding
-{
-    Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
-    
-    std::string exchange;
-    std::string key;
-    FieldTable options;
-};
-
-typedef std::vector<Binding> Bindings;
-
 
 class Queue : protected Node
 {
@@ -154,16 +169,11 @@ class Queue : protected Node
     void checkAssert(qpid::client::AsyncSession&, CheckMode);
     void checkDelete(qpid::client::AsyncSession&, CheckMode);
   private:
-    bool durable;
-    bool autoDelete;
-    bool exclusive;
-    std::string alternateExchange;
+    const bool durable;
+    const bool autoDelete;
+    const bool exclusive;
+    const std::string alternateExchange;
     FieldTable arguments;
-    Bindings bindings;
-
-    void configure(const Address&);
-    void addBindings(const Variant::List&);
-    void addBinding(const std::string&);
 };
 
 class Exchange : protected Node
@@ -174,17 +184,14 @@ class Exchange : protected Node
     void checkCreate(qpid::client::AsyncSession&, CheckMode);
     void checkAssert(qpid::client::AsyncSession&, CheckMode);
     void checkDelete(qpid::client::AsyncSession&, CheckMode);
-    const std::string& getDesiredExchangeType() { return type; }
 
+  protected:
+    const std::string specifiedType;
   private:
-    std::string type;
-    bool typeSpecified;
-    bool durable;
-    bool autoDelete;
-    std::string alternateExchange;
-    FieldTable arguments;    
-
-    void configure(const Address&);
+    const bool durable;
+    const bool autoDelete;
+    const std::string alternateExchange;
+    FieldTable arguments;
 };
 
 class QueueSource : public Queue, public MessageSource
@@ -203,24 +210,22 @@ class QueueSource : public Queue, public
 class Subscription : public Exchange, public MessageSource
 {
   public:
-    Subscription(const Address&, const std::string& exchangeType="");
+    Subscription(const Address&, const std::string& actualType);
     void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
     void cancel(qpid::client::AsyncSession& session, const std::string& destination);
   private:
     const std::string queue;
     const bool reliable;
     const bool durable;
+    const std::string actualType;
     FieldTable queueOptions;
     FieldTable subscriptionOptions;
     Bindings bindings;
     
-    void bindSpecial(const std::string& exchangeType);
-    void bind(const std::string& subject);
-    void bind(const std::string& subject, const Variant& filter);
-    void bind(const std::string& subject, const Variant::Map& filter);
-    void bind(const std::string& subject, const Variant::List& filter);
-    void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
-    static std::string getSubscriptionName(const std::string& base, const Variant& name);
+    void bindSubject(const std::string& subject);
+    void bindAll();
+    void add(const std::string& exchange, const std::string& key);
+    static std::string getSubscriptionName(const std::string& base, const std::string& name);
 };
 
 class ExchangeSink : public Exchange, public MessageSink
@@ -267,14 +272,102 @@ bool getSenderPolicy(const Address& addr
     return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
 }
 
+const Variant& getOption(const Variant::Map& options, const std::vector<std::string>& path, size_t index=0)
+{
+    Variant::Map::const_iterator j = options.find(path[index]);
+    if (j == options.end()) {
+        return EMPTY_VARIANT;
+    } else if (++index < path.size()) {
+        if (j->second.getType() != qpid::messaging::VAR_MAP) 
+            throw InvalidAddress((boost::format("Expected %1% to be a map") % j->first).str());
+        return getOption(j->second.asMap(), path, index);
+    } else {
+        return j->second;
+    }
+}
+
+const Variant& getOption(const Address& address, const std::vector<std::string>& path)
+{
+    return getOption(address.getOptions(), path);
+}
+
+const Variant& getOption(const Variant::Map& options, const std::string& name)
+{
+    Variant::Map::const_iterator j = options.find(name);
+    if (j == options.end()) {
+        return EMPTY_VARIANT;
+    } else {
+        return j->second;
+    }
+}
+
+struct Opt
+{
+    Opt(const Address& address);
+    Opt(const Variant::Map& base);
+    Opt& operator/(const std::string& name);
+    operator bool() const;
+    std::string str() const;
+    const Variant::List& asList() const;
+    void collect(qpid::framing::FieldTable& args) const;
+
+    const Variant::Map* options;
+    const Variant* value;
+};
+
+Opt::Opt(const Address& address) : options(&(address.getOptions())), value(0) {}
+Opt::Opt(const Variant::Map& base) : options(&base), value(0) {}
+Opt& Opt::operator/(const std::string& name)
+{
+    if (options) {
+        Variant::Map::const_iterator j = options->find(name);
+        if (j == options->end()) {            
+            value = 0;
+            options = 0;
+        } else {
+            value = &(j->second);
+            if (value->getType() == qpid::messaging::VAR_MAP) options = &(value->asMap());
+            else options = 0;
+        }
+    }
+    return *this;
+}
+
+
+Opt::operator bool() const
+{
+    return value && !value->isVoid() && value->asBool();
+}
+
+std::string Opt::str() const
+{
+    if (value) return value->asString();
+    else return EMPTY_STRING;
+}
+
+const Variant::List& Opt::asList() const
+{
+    if (value) return value->asList();
+    else return EMPTY_LIST;
+}
+
+void Opt::collect(qpid::framing::FieldTable& args) const
+{
+    if (value) {
+        translate(value->asMap(), args);
+    }
+}
+
 bool AddressResolution::is_unreliable(const Address& address)
 {
-    return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+    return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)), 
+              list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
 }
 
 bool AddressResolution::is_reliable(const Address& address)
 {
-    return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
+    return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)), 
+              list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
 }
 
 std::string checkAddressType(qpid::client::Session session, const Address& address)
@@ -282,7 +375,7 @@ std::string checkAddressType(qpid::clien
     if (address.getName().empty()) {
         throw InvalidAddress("Name cannot be null");
     }
-    std::string type = address.getType();
+    std::string type = (Opt(address)/NODE/TYPE).str();
     if (type.empty()) {
         ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
         if (result.getQueueNotFound() && result.getExchangeNotFound()) {
@@ -307,7 +400,8 @@ std::auto_ptr<MessageSource> AddressReso
 {
     std::string type = checkAddressType(session, address);
     if (type == TOPIC_ADDRESS) {
-        std::auto_ptr<MessageSource> source(new Subscription(address));
+        std::string exchangeType = sync(session).exchangeQuery(address.getName()).getType();
+        std::auto_ptr<MessageSource> source(new Subscription(address, exchangeType));
         QPID_LOG(debug, "treating source address as topic: " << address);
         return source;
     } else if (type == QUEUE_ADDRESS) {
@@ -337,18 +431,6 @@ std::auto_ptr<MessageSink> AddressResolu
     }
 }
 
-const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0)
-{
-    Variant::Map::const_iterator i = options.find(keys[index]);
-    if (i == options.end()) {
-        return EMPTY_VARIANT;
-    } else if (index+1 < keys.size()) {
-        return getNestedOption(i->second.asMap(), keys, index+1);
-    } else {
-        return i->second;
-    }
-}
-
 bool isBrowse(const Address& address)
 {
     const Variant& mode = address.getOption(MODE);
@@ -366,23 +448,18 @@ QueueSource::QueueSource(const Address& 
     acquireMode(isBrowse(address) ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
     exclusive(false)
 {
-    //extract subscription arguments from address options
-    const Variant& x = address.getOption(X_PROPERTIES);
-    if (!x.isVoid()) {
-        const Variant::Map& xProps = x.asMap();
-        Variant::Map passthrough;
-        for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
-            if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
-            else passthrough[i->first] = i->second;
-        }
-        translate(passthrough, options);
-    }
+    //extract subscription arguments from address options (nb: setting
+    //of accept-mode/acquire-mode/destination controlled though other
+    //options)
+    exclusive = Opt(address)/NODE/LINK/X_SUBSCRIBE/EXCLUSIVE;
+    (Opt(address)/NODE/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options);
 }
 
 void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
 {
     checkCreate(session, FOR_RECEIVER);
     checkAssert(session, FOR_RECEIVER);
+    linkBindings.bind(session);
     session.messageSubscribe(arg::queue=name, 
                              arg::destination=destination,
                              arg::acceptMode=acceptMode,
@@ -393,58 +470,72 @@ void QueueSource::subscribe(qpid::client
 
 void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
 {
+    linkBindings.unbind(session);
     session.messageCancel(destination);
     checkDelete(session, FOR_RECEIVER);
 }
 
-std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name)
+std::string Subscription::getSubscriptionName(const std::string& base, const std::string& name)
 {
-    if (name.isVoid()) {
+    if (name.empty()) {
         return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
     } else {
-        return (boost::format("%1%_%2%") % base % name.asString()).str();
+        return (boost::format("%1%_%2%") % base % name).str();
     }
 }
 
-Subscription::Subscription(const Address& address, const std::string& exchangeType)
+Subscription::Subscription(const Address& address, const std::string& type)
     : Exchange(address),
-      queue(getSubscriptionName(name, address.getOption(NAME))),
+      queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
       reliable(AddressResolution::is_reliable(address)),
-      durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
+      durable(Opt(address)/LINK/DURABLE),
+      actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
 {
-    if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
-    const Variant& x = address.getOption(X_PROPERTIES);
-    if (!x.isVoid()) {
-        const Variant::Map& xProps = x.asMap();
-        Variant::Map passthrough;
-        for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
-            if (i->first == xamqp::QUEUE_ARGUMENTS) convert(i->second.asMap(), queueOptions);
-            else passthrough[i->first] = i->second;
-        }
-        translate(passthrough, subscriptionOptions);
-    }
+    (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
+    (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
 
-    const Variant& filter = address.getOption(FILTER);
-    if (!filter.isVoid()) {
-        bind(address.getSubject(), filter);
-    } else if (address.hasSubject()) {
-        //Note: This will not work for headers- or xml- exchange;
-        //fanout exchange will do no filtering.
-        //TODO: for headers- or xml- exchange can construct a match
-        //for the subject in the application-headers
-        bind(address.getSubject());
+    if (!address.getSubject().empty()) bindSubject(address.getSubject());
+    else if (linkBindings.empty()) bindAll();
+}
+
+void Subscription::bindSubject(const std::string& subject)
+{
+    if (actualType == HEADERS_EXCHANGE) {
+        Binding b(name, queue, subject);
+        b.arguments.setString("qpid.subject", subject);
+        b.arguments.setString("x-match", "all");
+        bindings.push_back(b);
+    } else if (actualType == XML_EXCHANGE) {
+        Binding b(name, queue, subject);
+        std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'") 
+                                           % subject).str();
+        b.arguments.setString("xquery", query);
+        bindings.push_back(b);
     } else {
-        //Neither a subject nor a filter has been defined, treat this
-        //as wanting to match all messages (Note: direct exchange is
-        //currently unable to support this case).
-        if (!exchangeType.empty()) bindSpecial(exchangeType);
-        else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType());
+        //Note: the fanout exchange doesn't support any filtering, so
+        //the subject is ignored in that case
+        add(name, subject);
     }
 }
 
-void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
+void Subscription::bindAll()
 {
-    bindings.push_back(Binding(exchange, key, options));
+    if (actualType == TOPIC_EXCHANGE) {
+        add(name, WILDCARD_ANY);
+    } else if (actualType == FANOUT_EXCHANGE) {
+        add(name, queue);        
+    } else if (actualType == HEADERS_EXCHANGE) {
+        Binding b(name, queue, "match-all");
+        b.arguments.setString("x-match", "all");
+        bindings.push_back(b);
+    } else { //E.g. direct and xml
+        throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType));
+    }
+}
+
+void Subscription::add(const std::string& exchange, const std::string& key)
+{
+    bindings.push_back(Binding(exchange, queue, key));
 }
 
 void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
@@ -456,10 +547,11 @@ void Subscription::subscribe(qpid::clien
     //create subscription queue:
     session.queueDeclare(arg::queue=queue, arg::exclusive=true, 
                          arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
-    //bind subscription queue to exchange:
-    for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
-        session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
-    }
+    //'default' binding:
+    bindings.bind(session);
+    //any explicit bindings:
+    linkBindings.setDefaultQueue(queue);
+    linkBindings.bind(session);
     //subscribe to subscription queue:
     AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
     session.messageSubscribe(arg::queue=queue, arg::destination=destination, 
@@ -468,20 +560,19 @@ void Subscription::subscribe(qpid::clien
 
 void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
 {
+    linkBindings.unbind(session);
     session.messageCancel(destination);
     session.queueDelete(arg::queue=queue);
     checkDelete(session, FOR_RECEIVER);
 }
 
-Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
-    exchange(e), key(k), options(o) {}
-
 ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {}
 
 void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
 {
     checkCreate(session, FOR_SENDER);
     checkAssert(session, FOR_SENDER);
+    linkBindings.bind(session);
 }
 
 void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
@@ -492,6 +583,7 @@ void ExchangeSink::send(qpid::client::As
 
 void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
 {
+    linkBindings.unbind(session);
     checkDelete(session, FOR_SENDER);    
 }
 
@@ -501,6 +593,7 @@ void QueueSink::declare(qpid::client::As
 {
     checkCreate(session, FOR_SENDER);
     checkAssert(session, FOR_SENDER);
+    linkBindings.bind(session);
 }
 void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
 {
@@ -510,6 +603,7 @@ void QueueSink::send(qpid::client::Async
 
 void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
 {
+    linkBindings.unbind(session);
     checkDelete(session, FOR_SENDER);
 }
 
@@ -556,68 +650,24 @@ bool isTopic(qpid::client::Session sessi
     }
 }
 
-void Subscription::bind(const std::string& subject)
-{
-    add(name, subject);
-}
-
-void Subscription::bind(const std::string& subject, const Variant& filter)
-{
-    switch (filter.getType()) {
-      case qpid::messaging::VAR_MAP:
-        bind(subject, filter.asMap());
-        break;
-      case qpid::messaging::VAR_LIST:
-        bind(subject, filter.asList());
-        break;
-      default:
-        //TODO: if both subject _and_ filter are specified, combine in
-        //some way; for now we just ignore the subject in that case.
-        add(name, filter.asString());
-        break;
-    }
-}
-
-void Subscription::bind(const std::string& subject, const Variant::Map& filter)
-{
-    qpid::framing::FieldTable arguments;
-    translate(filter, arguments);
-    add(name, subject.empty() ? queue : subject, arguments);
-}
-
-void Subscription::bind(const std::string& subject, const Variant::List& filter)
-{
-    for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) {
-        bind(subject, *i);
-    }
-}
-
-void Subscription::bindSpecial(const std::string& exchangeType)
-{
-    if (exchangeType == TOPIC_EXCHANGE) {
-        add(name, WILDCARD_ANY);        
-    } else if (exchangeType == FANOUT_EXCHANGE) {
-        add(name, queue);        
-    } else if (exchangeType == HEADERS_EXCHANGE) {
-        //TODO: add special binding for headers exchange to match all messages
-    } else if (exchangeType == XML_EXCHANGE) {
-        //TODO: add special binding for xml exchange to match all messages
-    } else { //E.g. direct
-        throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType));
-    }
-}
-
 Node::Node(const Address& address) : name(address.getName()),
                                      createPolicy(address.getOption(CREATE)),
                                      assertPolicy(address.getOption(ASSERT)),
-                                     deletePolicy(address.getOption(DELETE)) {}
+                                     deletePolicy(address.getOption(DELETE))
+{
+    nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList());
+    linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList());
+}
 
 Queue::Queue(const Address& a) : Node(a),
-                                 durable(false),
-                                 autoDelete(false),
-                                 exclusive(false)
-{
-    configure(a);
+                                 durable(Opt(a)/NODE/DURABLE),
+                                 autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+                                 exclusive(Opt(a)/NODE/X_DECLARE/EXCLUSIVE),
+                                 alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+    (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+    nodeBindings.setDefaultQueue(name);
+    linkBindings.setDefaultQueue(name);
 }
 
 void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
@@ -634,14 +684,7 @@ void Queue::checkCreate(qpid::client::As
         } catch (const qpid::Exception& e) {
             throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
         }
-        try {
-            for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
-                session.exchangeBind(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
-            }
-            session.sync();
-        } catch (const qpid::Exception& e) {
-            throw InvalidAddress((boost::format("Could not create queue bindings for %1%; %2%") % name % e.what()).str());
-        }
+        nodeBindings.bind(session);
     } else {
         try {
             sync(session).queueDeclare(arg::queue=name, arg::passive=true);
@@ -694,82 +737,38 @@ void Queue::checkAssert(qpid::client::As
                                           % i->first % name % *(i->second) % *v).str());
                 }
             }
-            for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
-                ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
-                if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
-                    throw InvalidAddress((boost::format("Binding %1%/%2% for %3% was not matched") % i->exchange % i->key % name).str());
-                }
-            }
-        }
-    }
-}
-
-void Queue::addBinding(const std::string& b)
-{
-    string::size_type i = b.find('/');
-    if (i == string::npos) {
-        bindings.push_back(Binding(b, EMPTY_STRING));
-    } else {
-        std::string exchange = b.substr(0, i);
-        if (i+1 < b.size()) {
-            bindings.push_back(Binding(exchange, b.substr(i+1)));
-        } else {
-            bindings.push_back(Binding(exchange, EMPTY_STRING));
-        }
-    }
-}
-
-void Queue::addBindings(const Variant::List& list)
-{
-    for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
-        addBinding(i->asString());
-    }
-}
-
-void Queue::configure(const Address& address)
-{
-    const Variant& v = address.getOption(NODE_PROPERTIES);
-    if (!v.isVoid()) {
-        Variant::Map nodeProps = v.asMap();
-        durable = nodeProps[DURABLE];
-        Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES);
-        if (x != nodeProps.end()) {
-            const Variant::Map& xProps = x->second.asMap();
-            Variant::Map passthrough;
-            for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
-                if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
-                else if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
-                else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
-                else if (i->first == xamqp::BINDINGS) addBindings(i->second.asList());
-                else passthrough[i->first] = i->second;
-            }
-            translate(passthrough, arguments);
+            nodeBindings.check(session);
         }
     }
 }
 
 Exchange::Exchange(const Address& a) : Node(a),
-                                       type(TOPIC_EXCHANGE),
-                                       typeSpecified(false),
-                                       durable(false),
-                                       autoDelete(false)
-{
-    configure(a);
+                                       specifiedType((Opt(a)/NODE/X_DECLARE/TYPE).str()),
+                                       durable(Opt(a)/NODE/DURABLE),
+                                       autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+                                       alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+    (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+    nodeBindings.setDefaultExchange(name);
+    linkBindings.setDefaultExchange(name);
 }
 
 void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
 {
     if (enabled(createPolicy, mode)) {
         try {
+            std::string type = specifiedType;
+            if (type.empty()) type = TOPIC_EXCHANGE;
             sync(session).exchangeDeclare(arg::exchange=name,
-                                      arg::type=type,
-                                      arg::durable=durable,
-                                      arg::autoDelete=autoDelete,
-                                      arg::alternateExchange=alternateExchange,
-                                      arg::arguments=arguments);
+                                          arg::type=type,
+                                          arg::durable=durable,
+                                          arg::autoDelete=autoDelete,
+                                          arg::alternateExchange=alternateExchange,
+                                          arg::arguments=arguments);
         } catch (const qpid::Exception& e) {
             throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
         }
+        nodeBindings.bind(session);
     } else {
         try {
             sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
@@ -800,9 +799,9 @@ void Exchange::checkAssert(qpid::client:
         if (result.getNotFound()) {
             throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
         } else {
-            if (typeSpecified && result.getType() != type) {
+            if (specifiedType.size() && result.getType() != specifiedType) {
                 throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") 
-                                      % name % type % result.getType()).str());
+                                      % name % specifiedType % result.getType()).str());
             }
             if (durable && !result.getDurable()) {
                 throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
@@ -819,31 +818,79 @@ void Exchange::checkAssert(qpid::client:
                                           % i->first % name % *(i->second) % *v).str());
                 }
             }
+            nodeBindings.check(session);
         }
     }
 }
 
-void Exchange::configure(const Address& address)
+Binding::Binding(const Variant::Map& b) : 
+    exchange((Opt(b)/EXCHANGE).str()),
+    queue((Opt(b)/QUEUE).str()),
+    key((Opt(b)/KEY).str())
 {
-    const Variant& v = address.getOption(NODE_PROPERTIES);
-    if (!v.isVoid()) {
-        Variant::Map nodeProps = v.asMap();
-        durable = nodeProps[DURABLE];
-        Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES);
-        if (x != nodeProps.end()) {
-            const Variant::Map& xProps = x->second.asMap();
-            Variant::Map passthrough;
-            for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
-                if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
-                else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; }
-                else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
-                else passthrough[i->first] = i->second;
-            }
-            translate(passthrough, arguments);
+    (Opt(b)/ARGUMENTS).collect(arguments);
+}
+
+Binding::Binding(const std::string& e, const std::string& q, const std::string& k) : exchange(e), queue(q), key(k) {}
+
+
+void Bindings::add(const Variant::List& list)
+{
+    for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+        push_back(Binding(i->asMap()));
+    }
+}
+
+void Bindings::setDefaultExchange(const std::string& exchange)
+{
+    for (Bindings::iterator i = begin(); i != end(); ++i) {
+        if (i->exchange.empty()) i->exchange = exchange;
+    }
+}
+
+void Bindings::setDefaultQueue(const std::string& queue)
+{
+    for (Bindings::iterator i = begin(); i != end(); ++i) {
+        if (i->queue.empty()) i->queue = queue;
+    }
+}
+
+void Bindings::bind(qpid::client::AsyncSession& session)
+{
+    try {        
+        for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+            session.exchangeBind(arg::queue=i->queue,
+                                 arg::exchange=i->exchange,
+                                 arg::bindingKey=i->key,
+                                 arg::arguments=i->arguments);
         }
+        session.sync();
+    } catch (const qpid::Exception& e) {
+        throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str());
     }
 }
 
+void Bindings::unbind(qpid::client::AsyncSession& session)
+{
+    for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+        session.exchangeUnbind(arg::queue=i->queue,
+                               arg::exchange=i->exchange,
+                               arg::bindingKey=i->key);
+    }
+}
+
+void Bindings::check(qpid::client::AsyncSession& session)
+{
+    for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+        ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue, 
+                                                                 arg::exchange=i->exchange,
+                                                                 arg::bindingKey=i->key);
+        if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
+            throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") 
+                                  % i->exchange % i->queue % i->key).str());
+        }
+    }
+}
 
 bool Node::enabled(const Variant& policy, CheckMode mode)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp Wed Mar 24 18:35:11 2010
@@ -114,7 +114,7 @@ void Address::setOptions(const Variant::
 namespace{
 const Variant EMPTY_VARIANT;
 const std::string EMPTY_STRING;
-const std::string NODE_PROPERTIES="node-properties";
+const std::string NODE_PROPERTIES="node";
 }
 
 const Variant& find(const Variant::Map& map, const std::string& key)

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed Mar 24 18:35:11 2010
@@ -564,13 +564,13 @@ struct QueueCreatePolicyFixture : public
 
 QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
 {
-    QueueCreatePolicyFixture fix("#; {create:always, node-properties:{type:queue}}");
+    QueueCreatePolicyFixture fix("#; {create:always, node:{type:queue}}");
     fix.test();
 }
 
 QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
 {
-    QueueCreatePolicyFixture fix("#; {create:receiver, node-properties:{type:queue}}");
+    QueueCreatePolicyFixture fix("#; {create:receiver, node:{type:queue}}");
     Receiver r = fix.session.createReceiver(fix.address);
     fix.test();
     r.close();
@@ -578,7 +578,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyQueu
 
 QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
 {
-    QueueCreatePolicyFixture fix("#; {create:sender, node-properties:{type:queue}}");
+    QueueCreatePolicyFixture fix("#; {create:sender, node:{type:queue}}");
     Sender s = fix.session.createSender(fix.address);
     fix.test();
     s.close();
@@ -608,14 +608,14 @@ struct ExchangeCreatePolicyFixture : pub
 
 QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
 {
-    ExchangeCreatePolicyFixture fix("#; {create:always, node-properties:{type:topic}}",
+    ExchangeCreatePolicyFixture fix("#; {create:always, node:{type:topic}}",
                                   "topic");
     fix.test();
 }
 
 QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
 {
-    ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node-properties:{type:topic, x-properties:{type:fanout}}}", "fanout");
+    ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node:{type:topic, x-declare:{type:fanout}}}", "fanout");
     Receiver r = fix.session.createReceiver(fix.address);
     fix.test();
     r.close();
@@ -623,7 +623,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyTopi
 
 QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
 {
-    ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node-properties:{type:topic, x-properties:{type:direct}}}", "direct");
+    ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node:{type:topic, x-declare:{type:direct}}}", "direct");
     Sender s = fix.session.createSender(fix.address);
     fix.test();
     s.close();
@@ -746,18 +746,18 @@ QPID_AUTO_TEST_CASE(testDeletePolicyExch
 QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
 {
     MessagingFixture fix;
-    std::string a1 = "q; {create:always, assert:always, node-properties:{type:queue, durable:false, x-properties:{qpid.max-count:100}}}";
+    std::string a1 = "q; {create:always, assert:always, node:{type:queue, durable:false, x-declare:{arguments:{qpid.max-count:100}}}}";
     Sender s1 = fix.session.createSender(a1);
     s1.close();
     Receiver r1 = fix.session.createReceiver(a1);
     r1.close();
     
-    std::string a2 = "q; {assert:receiver, node-properties:{durable:true, x-properties:{qpid.max-count:100}}}";
+    std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
     Sender s2 = fix.session.createSender(a2);
     s2.close();
     BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
 
-    std::string a3 = "q; {assert:sender, node-properties:{x-properties:{qpid.max-count:99}}}";
+    std::string a3 = "q; {assert:sender, node:{x-declare:{arguments:{qpid.max-count:99}}}}";
     BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
     Receiver r3 = fix.session.createReceiver(a3);
     r3.close();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org