You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/03/24 19:35:12 UTC
svn commit: r927144 - in /qpid/trunk/qpid/cpp: examples/messaging/readme.txt
include/qpid/messaging/Address.h
src/qpid/client/amqp0_10/AddressResolution.cpp
src/qpid/messaging/Address.cpp src/tests/MessagingSessionTests.cpp
Author: gsim
Date: Wed Mar 24 18:35:11 2010
New Revision: 927144
URL: http://svn.apache.org/viewvc?rev=927144&view=rev
Log:
QPID-664: Updates to address options to stay in-line with python client changes in r926604
Modified:
qpid/trunk/qpid/cpp/examples/messaging/readme.txt
qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/examples/messaging/readme.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/readme.txt?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/readme.txt (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/readme.txt Wed Mar 24 18:35:11 2010
@@ -96,7 +96,7 @@ of address (as there is no existing enti
type and as we do not want the default type to be created, namely a
queue):
-* run: ./drain -f --address 'my-new-topic; {create: always, node-properties:{type:topic}}'
+* run: ./drain -f --address 'my-new-topic; {create: always, node:{type:topic}}'
* then run: ./spout --address my-new-queue
The value to the create policy is one of always, sender, receiver or
@@ -128,19 +128,17 @@ qpid-config or even auto-create one):
An example using xquery based filtering with the xml exchange:
* First start a subscriber with an xquery filter specified:
- ./drain -f --address 'xml/my-subject; {filter:{xquery:"declare variable $colour external; $colour = '\''red'\''"}}'
+ ./drain -f --address 'xml; {link:{x-bindings:[{arguments:{xquery:"declare variable $colour external; $colour = '\''red'\''"}}]}}'
* Then test receipt of messages that match the xquery filter:
- ./spout --address 'xml/my-subject' --property colour=red --content 'matched!'
+ ./spout --address 'xml' --property colour=red --content 'matched!'
and
- ./spout --address 'xml/my-subject' --property colour=blue --content 'not matched'
+ ./spout --address 'xml' --property colour=blue --content 'not matched'
TODO:
* auto-creating exchanges of different types
-* xml content in the xquery example
-
* 'durable' and 'reliable' subscriptions
* map content
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h Wed Mar 24 18:35:11 2010
@@ -65,82 +65,57 @@ class AddressImpl;
*
* <table border=0>
*
- * <tr valign=top><td>create</td><td>Indicate whether the address should be
- * automatically created or not. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>. The properties of
- * the node to be created can be specified via the node-properties
- * option (see below).</td></tr>
- *
- * <tr valign=top><td>assert</td><td>Indicate whether or not to assert any specified
- * node-properties match the address. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
- *
- * <tr valign=top><td>delete</td><td>Indicate whether or not to delete the addressed
- * nide when a sender or receiver is cancelled. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
+ * <tr valign=top>
+ * <td>create</td>
+ * <td>Indicate whether the address should be automatically created
+ * or not. Can be one of <i>always</i>, <i>never</i>,
+ * <i>sender</i> or <i>receiver</i>. The properties of the node
+ * to be created can be specified via the node options (see
+ * below).
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>assert</td>
+ * <td>Indicate whether or not to assert any specified node
+ * properties(see below) match the address. Can be one of
+ * <i>always</i>, <i>never</i>, <i>sender</i> or
+ * <i>receiver</i>.
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>delete</td>
+ * <td>Indicate whether or not to delete the addressed node when a
+ * sender or receiver is cancelled. Can be one of <i>always</i>,
+ * <i>never</i>, <i>sender</i> or <i>receiver</i>.
+ * </td>
+ * </tr>
*
- * <tr valign=top><td>reliability</td><td>indicates the level of
- * reliability expected. Can be one of unreliable, at-most-once,
- * at-least-once or exactly-once (the latter is not yet correctly
- * supported).</td></tr>
- *
- * <tr valign=top><td>node-properties</td><td>A nested map of properties of the addressed
- * entity or 'node'. These can be used when automatically creating it,
- * or to assert certain properties.
- *
- * The valid node-properties are:
- * <ul>
- * <li>type - queue or topic</li>
- *
- * <li>durable - true or false</li>
- *
- * <li>x-properties - a nested map that can contain implementation or
- * protocol specifiec extedned properties. For the amqp 0-10 mapping,
- * the fields in queue- or exchange- declare can be specified in here;
- * a bindings entry may also be specified, whose value should be an
- * array of strings of the form exchange/key; anything else will be
- * passed through in the arguments field.
- * </li>
- * </ul>
- * </td></tr>
- *
- * </table>
+ * <tr valign=top>
+ * <td>node</td>
+ * <td>A nested map describing properties of the addressed
+ * node. Current properties supported are type (topic or queue),
+ * durable (boolean), x-declare and x-bindings.
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>link</td>
+ * <td>A nested map through which properties of the 'link' from
+ * sender/receiver to node can be configured. Current propeties
+ * are name, durable, realiability, x-declare, x-subscribe and
+ * x-bindings.
+ * </td>
+ * </tr>
*
- * For receivers there are some further options of interest:
+ * For receivers there is one other option of interest:
*
* <table border=0 valign=top>
- *
- * <tr valign=top><td>no-local</td><td>(only relevant for topics at present) specifies that the
- * receiver does not want to receiver messages published to the topic
- * that originate from a sender on the same connection</td></tr>
- *
* <tr valign=top><td>mode</td><td>(only relevant for queues)
* indicates whether the subscribe should consume (the default) or
* merely browse the messages. Valid values are 'consume' and
* 'browse'</td></tr>
- *
- * <tr valign=top><td>durable</td><td>(only relevant for topics at present) specifies that a
- * durable subscription is required</td></tr>
- *
- * <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to
- * be created for the queue that match the given criteria (or list of
- * criteria).</td></tr>
- *
- * <tr valign=top><td>x-properties</td><td>allows protocol or implementation specific options
- * to be specified for a receiver; this is a nested map and currently
- * the implementation only recognises two specific nested properties
- * within it (all others are passed through in the arguments of the
- * message-subscribe command):
- *
- * <ul>
- * <li>exclusive, which requests an exclusive subscription and
- * is only relevant for queues</li>
- *
- * <li>x-queue-arguments, which is only relevant for topics and
- * allows arguments to the queue-declare for the subscription
- * queue to be specified</li>
- * </ul>
- * </td></tr>
* </table>
*/
class Address
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Wed Mar 24 18:35:11 2010
@@ -61,40 +61,54 @@ using namespace boost::assign;
namespace{
const Variant EMPTY_VARIANT;
const FieldTable EMPTY_FIELD_TABLE;
+ const Variant::List EMPTY_LIST;
const std::string EMPTY_STRING;
-//option names
-const std::string BROWSE("browse");
-const std::string CONSUME("consume");
-const std::string EXCLUSIVE("exclusive");
-const std::string MODE("mode");
-const std::string NO_LOCAL("no-local");
-const std::string FILTER("filter");
-const std::string RELIABILITY("reliability");
-const std::string NAME("subscription-name");
-const std::string NODE_PROPERTIES("node-properties");
-const std::string X_PROPERTIES("x-properties");
-
//policy types
const std::string CREATE("create");
const std::string ASSERT("assert");
const std::string DELETE("delete");
+
+//option names
+const std::string NODE("node");
+const std::string LINK("link");
+const std::string MODE("mode");
+const std::string RELIABILITY("reliability");
+const std::string NAME("name");
+const std::string DURABLE("durable");
+const std::string X_DECLARE("x-declare");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string X_BINDINGS("x-bindings");
+const std::string EXCHANGE("exchange");
+const std::string QUEUE("queue");
+const std::string KEY("key");
+const std::string ARGUMENTS("arguments");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string TYPE("type");
+const std::string EXCLUSIVE("exclusive");
+const std::string AUTO_DELETE("auto-delete");
+
//policy values
const std::string ALWAYS("always");
const std::string NEVER("never");
const std::string RECEIVER("receiver");
const std::string SENDER("sender");
+//address types
const std::string QUEUE_ADDRESS("queue");
const std::string TOPIC_ADDRESS("topic");
+//reliability options:
const std::string UNRELIABLE("unreliable");
const std::string AT_MOST_ONCE("at-most-once");
const std::string AT_LEAST_ONCE("at-least-once");
const std::string EXACTLY_ONCE("exactly-once");
-const std::string DURABLE_SUBSCRIPTION("durable");
-const std::string DURABLE("durable");
+//receiver modes:
+const std::string BROWSE("browse");
+const std::string CONSUME("consume");
+
+//0-10 exchange types:
const std::string TOPIC_EXCHANGE("topic");
const std::string FANOUT_EXCHANGE("fanout");
const std::string DIRECT_EXCHANGE("direct");
@@ -103,16 +117,26 @@ const std::string XML_EXCHANGE("xml");
const std::string WILDCARD_ANY("*");
}
-//some amqp 0-10 specific options
-namespace xamqp{
-const std::string AUTO_DELETE("auto-delete");
-const std::string EXCHANGE_TYPE("type");
-const std::string EXCLUSIVE("exclusive");
-const std::string ALTERNATE_EXCHANGE("alternate-exchange");
-const std::string QUEUE_ARGUMENTS("x-queue-arguments");
-const std::string SUBSCRIBE_ARGUMENTS("x-subscribe-arguments");
-const std::string BINDINGS("bindings");
-}
+struct Binding
+{
+ Binding(const Variant::Map&);
+ Binding(const std::string& exchange, const std::string& queue, const std::string& key);
+
+ std::string exchange;
+ std::string queue;
+ std::string key;
+ FieldTable arguments;
+};
+
+struct Bindings : std::vector<Binding>
+{
+ void add(const Variant::List& bindings);
+ void setDefaultExchange(const std::string&);
+ void setDefaultQueue(const std::string&);
+ void bind(qpid::client::AsyncSession& session);
+ void unbind(qpid::client::AsyncSession& session);
+ void check(qpid::client::AsyncSession& session);
+};
class Node
{
@@ -125,6 +149,8 @@ class Node
Variant createPolicy;
Variant assertPolicy;
Variant deletePolicy;
+ Bindings nodeBindings;
+ Bindings linkBindings;
static bool enabled(const Variant& policy, CheckMode mode);
static bool createEnabled(const Address& address, CheckMode mode);
@@ -133,17 +159,6 @@ class Node
static std::vector<std::string> SENDER_MODES;
};
-struct Binding
-{
- Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
-
- std::string exchange;
- std::string key;
- FieldTable options;
-};
-
-typedef std::vector<Binding> Bindings;
-
class Queue : protected Node
{
@@ -154,16 +169,11 @@ class Queue : protected Node
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
private:
- bool durable;
- bool autoDelete;
- bool exclusive;
- std::string alternateExchange;
+ const bool durable;
+ const bool autoDelete;
+ const bool exclusive;
+ const std::string alternateExchange;
FieldTable arguments;
- Bindings bindings;
-
- void configure(const Address&);
- void addBindings(const Variant::List&);
- void addBinding(const std::string&);
};
class Exchange : protected Node
@@ -174,17 +184,14 @@ class Exchange : protected Node
void checkCreate(qpid::client::AsyncSession&, CheckMode);
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
- const std::string& getDesiredExchangeType() { return type; }
+ protected:
+ const std::string specifiedType;
private:
- std::string type;
- bool typeSpecified;
- bool durable;
- bool autoDelete;
- std::string alternateExchange;
- FieldTable arguments;
-
- void configure(const Address&);
+ const bool durable;
+ const bool autoDelete;
+ const std::string alternateExchange;
+ FieldTable arguments;
};
class QueueSource : public Queue, public MessageSource
@@ -203,24 +210,22 @@ class QueueSource : public Queue, public
class Subscription : public Exchange, public MessageSource
{
public:
- Subscription(const Address&, const std::string& exchangeType="");
+ Subscription(const Address&, const std::string& actualType);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
const std::string queue;
const bool reliable;
const bool durable;
+ const std::string actualType;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
- void bindSpecial(const std::string& exchangeType);
- void bind(const std::string& subject);
- void bind(const std::string& subject, const Variant& filter);
- void bind(const std::string& subject, const Variant::Map& filter);
- void bind(const std::string& subject, const Variant::List& filter);
- void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
- static std::string getSubscriptionName(const std::string& base, const Variant& name);
+ void bindSubject(const std::string& subject);
+ void bindAll();
+ void add(const std::string& exchange, const std::string& key);
+ static std::string getSubscriptionName(const std::string& base, const std::string& name);
};
class ExchangeSink : public Exchange, public MessageSink
@@ -267,14 +272,102 @@ bool getSenderPolicy(const Address& addr
return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
}
+const Variant& getOption(const Variant::Map& options, const std::vector<std::string>& path, size_t index=0)
+{
+ Variant::Map::const_iterator j = options.find(path[index]);
+ if (j == options.end()) {
+ return EMPTY_VARIANT;
+ } else if (++index < path.size()) {
+ if (j->second.getType() != qpid::messaging::VAR_MAP)
+ throw InvalidAddress((boost::format("Expected %1% to be a map") % j->first).str());
+ return getOption(j->second.asMap(), path, index);
+ } else {
+ return j->second;
+ }
+}
+
+const Variant& getOption(const Address& address, const std::vector<std::string>& path)
+{
+ return getOption(address.getOptions(), path);
+}
+
+const Variant& getOption(const Variant::Map& options, const std::string& name)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return EMPTY_VARIANT;
+ } else {
+ return j->second;
+ }
+}
+
+struct Opt
+{
+ Opt(const Address& address);
+ Opt(const Variant::Map& base);
+ Opt& operator/(const std::string& name);
+ operator bool() const;
+ std::string str() const;
+ const Variant::List& asList() const;
+ void collect(qpid::framing::FieldTable& args) const;
+
+ const Variant::Map* options;
+ const Variant* value;
+};
+
+Opt::Opt(const Address& address) : options(&(address.getOptions())), value(0) {}
+Opt::Opt(const Variant::Map& base) : options(&base), value(0) {}
+Opt& Opt::operator/(const std::string& name)
+{
+ if (options) {
+ Variant::Map::const_iterator j = options->find(name);
+ if (j == options->end()) {
+ value = 0;
+ options = 0;
+ } else {
+ value = &(j->second);
+ if (value->getType() == qpid::messaging::VAR_MAP) options = &(value->asMap());
+ else options = 0;
+ }
+ }
+ return *this;
+}
+
+
+Opt::operator bool() const
+{
+ return value && !value->isVoid() && value->asBool();
+}
+
+std::string Opt::str() const
+{
+ if (value) return value->asString();
+ else return EMPTY_STRING;
+}
+
+const Variant::List& Opt::asList() const
+{
+ if (value) return value->asList();
+ else return EMPTY_LIST;
+}
+
+void Opt::collect(qpid::framing::FieldTable& args) const
+{
+ if (value) {
+ translate(value->asMap(), args);
+ }
+}
+
bool AddressResolution::is_unreliable(const Address& address)
{
- return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+ return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)),
+ list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
bool AddressResolution::is_reliable(const Address& address)
{
- return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
+ return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)),
+ list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
std::string checkAddressType(qpid::client::Session session, const Address& address)
@@ -282,7 +375,7 @@ std::string checkAddressType(qpid::clien
if (address.getName().empty()) {
throw InvalidAddress("Name cannot be null");
}
- std::string type = address.getType();
+ std::string type = (Opt(address)/NODE/TYPE).str();
if (type.empty()) {
ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
if (result.getQueueNotFound() && result.getExchangeNotFound()) {
@@ -307,7 +400,8 @@ std::auto_ptr<MessageSource> AddressReso
{
std::string type = checkAddressType(session, address);
if (type == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSource> source(new Subscription(address));
+ std::string exchangeType = sync(session).exchangeQuery(address.getName()).getType();
+ std::auto_ptr<MessageSource> source(new Subscription(address, exchangeType));
QPID_LOG(debug, "treating source address as topic: " << address);
return source;
} else if (type == QUEUE_ADDRESS) {
@@ -337,18 +431,6 @@ std::auto_ptr<MessageSink> AddressResolu
}
}
-const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0)
-{
- Variant::Map::const_iterator i = options.find(keys[index]);
- if (i == options.end()) {
- return EMPTY_VARIANT;
- } else if (index+1 < keys.size()) {
- return getNestedOption(i->second.asMap(), keys, index+1);
- } else {
- return i->second;
- }
-}
-
bool isBrowse(const Address& address)
{
const Variant& mode = address.getOption(MODE);
@@ -366,23 +448,18 @@ QueueSource::QueueSource(const Address&
acquireMode(isBrowse(address) ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
exclusive(false)
{
- //extract subscription arguments from address options
- const Variant& x = address.getOption(X_PROPERTIES);
- if (!x.isVoid()) {
- const Variant::Map& xProps = x.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, options);
- }
+ //extract subscription arguments from address options (nb: setting
+ //of accept-mode/acquire-mode/destination controlled though other
+ //options)
+ exclusive = Opt(address)/NODE/LINK/X_SUBSCRIBE/EXCLUSIVE;
+ (Opt(address)/NODE/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options);
}
void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
checkCreate(session, FOR_RECEIVER);
checkAssert(session, FOR_RECEIVER);
+ linkBindings.bind(session);
session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
@@ -393,58 +470,72 @@ void QueueSource::subscribe(qpid::client
void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
+ linkBindings.unbind(session);
session.messageCancel(destination);
checkDelete(session, FOR_RECEIVER);
}
-std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name)
+std::string Subscription::getSubscriptionName(const std::string& base, const std::string& name)
{
- if (name.isVoid()) {
+ if (name.empty()) {
return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
} else {
- return (boost::format("%1%_%2%") % base % name.asString()).str();
+ return (boost::format("%1%_%2%") % base % name).str();
}
}
-Subscription::Subscription(const Address& address, const std::string& exchangeType)
+Subscription::Subscription(const Address& address, const std::string& type)
: Exchange(address),
- queue(getSubscriptionName(name, address.getOption(NAME))),
+ queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
- durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
+ durable(Opt(address)/LINK/DURABLE),
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
{
- if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
- const Variant& x = address.getOption(X_PROPERTIES);
- if (!x.isVoid()) {
- const Variant::Map& xProps = x.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::QUEUE_ARGUMENTS) convert(i->second.asMap(), queueOptions);
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, subscriptionOptions);
- }
+ (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
+ (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
- const Variant& filter = address.getOption(FILTER);
- if (!filter.isVoid()) {
- bind(address.getSubject(), filter);
- } else if (address.hasSubject()) {
- //Note: This will not work for headers- or xml- exchange;
- //fanout exchange will do no filtering.
- //TODO: for headers- or xml- exchange can construct a match
- //for the subject in the application-headers
- bind(address.getSubject());
+ if (!address.getSubject().empty()) bindSubject(address.getSubject());
+ else if (linkBindings.empty()) bindAll();
+}
+
+void Subscription::bindSubject(const std::string& subject)
+{
+ if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, subject);
+ b.arguments.setString("qpid.subject", subject);
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else if (actualType == XML_EXCHANGE) {
+ Binding b(name, queue, subject);
+ std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
+ % subject).str();
+ b.arguments.setString("xquery", query);
+ bindings.push_back(b);
} else {
- //Neither a subject nor a filter has been defined, treat this
- //as wanting to match all messages (Note: direct exchange is
- //currently unable to support this case).
- if (!exchangeType.empty()) bindSpecial(exchangeType);
- else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType());
+ //Note: the fanout exchange doesn't support any filtering, so
+ //the subject is ignored in that case
+ add(name, subject);
}
}
-void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
+void Subscription::bindAll()
{
- bindings.push_back(Binding(exchange, key, options));
+ if (actualType == TOPIC_EXCHANGE) {
+ add(name, WILDCARD_ANY);
+ } else if (actualType == FANOUT_EXCHANGE) {
+ add(name, queue);
+ } else if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, "match-all");
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else { //E.g. direct and xml
+ throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType));
+ }
+}
+
+void Subscription::add(const std::string& exchange, const std::string& key)
+{
+ bindings.push_back(Binding(exchange, queue, key));
}
void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
@@ -456,10 +547,11 @@ void Subscription::subscribe(qpid::clien
//create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=true,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
- //bind subscription queue to exchange:
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
- }
+ //'default' binding:
+ bindings.bind(session);
+ //any explicit bindings:
+ linkBindings.setDefaultQueue(queue);
+ linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
session.messageSubscribe(arg::queue=queue, arg::destination=destination,
@@ -468,20 +560,19 @@ void Subscription::subscribe(qpid::clien
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
+ linkBindings.unbind(session);
session.messageCancel(destination);
session.queueDelete(arg::queue=queue);
checkDelete(session, FOR_RECEIVER);
}
-Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
- exchange(e), key(k), options(o) {}
-
ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {}
void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
checkCreate(session, FOR_SENDER);
checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
}
void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
@@ -492,6 +583,7 @@ void ExchangeSink::send(qpid::client::As
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
+ linkBindings.unbind(session);
checkDelete(session, FOR_SENDER);
}
@@ -501,6 +593,7 @@ void QueueSink::declare(qpid::client::As
{
checkCreate(session, FOR_SENDER);
checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
}
void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
@@ -510,6 +603,7 @@ void QueueSink::send(qpid::client::Async
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
+ linkBindings.unbind(session);
checkDelete(session, FOR_SENDER);
}
@@ -556,68 +650,24 @@ bool isTopic(qpid::client::Session sessi
}
}
-void Subscription::bind(const std::string& subject)
-{
- add(name, subject);
-}
-
-void Subscription::bind(const std::string& subject, const Variant& filter)
-{
- switch (filter.getType()) {
- case qpid::messaging::VAR_MAP:
- bind(subject, filter.asMap());
- break;
- case qpid::messaging::VAR_LIST:
- bind(subject, filter.asList());
- break;
- default:
- //TODO: if both subject _and_ filter are specified, combine in
- //some way; for now we just ignore the subject in that case.
- add(name, filter.asString());
- break;
- }
-}
-
-void Subscription::bind(const std::string& subject, const Variant::Map& filter)
-{
- qpid::framing::FieldTable arguments;
- translate(filter, arguments);
- add(name, subject.empty() ? queue : subject, arguments);
-}
-
-void Subscription::bind(const std::string& subject, const Variant::List& filter)
-{
- for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) {
- bind(subject, *i);
- }
-}
-
-void Subscription::bindSpecial(const std::string& exchangeType)
-{
- if (exchangeType == TOPIC_EXCHANGE) {
- add(name, WILDCARD_ANY);
- } else if (exchangeType == FANOUT_EXCHANGE) {
- add(name, queue);
- } else if (exchangeType == HEADERS_EXCHANGE) {
- //TODO: add special binding for headers exchange to match all messages
- } else if (exchangeType == XML_EXCHANGE) {
- //TODO: add special binding for xml exchange to match all messages
- } else { //E.g. direct
- throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType));
- }
-}
-
Node::Node(const Address& address) : name(address.getName()),
createPolicy(address.getOption(CREATE)),
assertPolicy(address.getOption(ASSERT)),
- deletePolicy(address.getOption(DELETE)) {}
+ deletePolicy(address.getOption(DELETE))
+{
+ nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList());
+ linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList());
+}
Queue::Queue(const Address& a) : Node(a),
- durable(false),
- autoDelete(false),
- exclusive(false)
-{
- configure(a);
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ exclusive(Opt(a)/NODE/X_DECLARE/EXCLUSIVE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultQueue(name);
+ linkBindings.setDefaultQueue(name);
}
void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
@@ -634,14 +684,7 @@ void Queue::checkCreate(qpid::client::As
} catch (const qpid::Exception& e) {
throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
}
- try {
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- session.exchangeBind(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
- }
- session.sync();
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create queue bindings for %1%; %2%") % name % e.what()).str());
- }
+ nodeBindings.bind(session);
} else {
try {
sync(session).queueDeclare(arg::queue=name, arg::passive=true);
@@ -694,82 +737,38 @@ void Queue::checkAssert(qpid::client::As
% i->first % name % *(i->second) % *v).str());
}
}
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
- if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw InvalidAddress((boost::format("Binding %1%/%2% for %3% was not matched") % i->exchange % i->key % name).str());
- }
- }
- }
- }
-}
-
-void Queue::addBinding(const std::string& b)
-{
- string::size_type i = b.find('/');
- if (i == string::npos) {
- bindings.push_back(Binding(b, EMPTY_STRING));
- } else {
- std::string exchange = b.substr(0, i);
- if (i+1 < b.size()) {
- bindings.push_back(Binding(exchange, b.substr(i+1)));
- } else {
- bindings.push_back(Binding(exchange, EMPTY_STRING));
- }
- }
-}
-
-void Queue::addBindings(const Variant::List& list)
-{
- for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- addBinding(i->asString());
- }
-}
-
-void Queue::configure(const Address& address)
-{
- const Variant& v = address.getOption(NODE_PROPERTIES);
- if (!v.isVoid()) {
- Variant::Map nodeProps = v.asMap();
- durable = nodeProps[DURABLE];
- Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES);
- if (x != nodeProps.end()) {
- const Variant::Map& xProps = x->second.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
- else if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
- else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
- else if (i->first == xamqp::BINDINGS) addBindings(i->second.asList());
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, arguments);
+ nodeBindings.check(session);
}
}
}
Exchange::Exchange(const Address& a) : Node(a),
- type(TOPIC_EXCHANGE),
- typeSpecified(false),
- durable(false),
- autoDelete(false)
-{
- configure(a);
+ specifiedType((Opt(a)/NODE/X_DECLARE/TYPE).str()),
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultExchange(name);
+ linkBindings.setDefaultExchange(name);
}
void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
try {
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
sync(session).exchangeDeclare(arg::exchange=name,
- arg::type=type,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
} catch (const qpid::Exception& e) {
throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
}
+ nodeBindings.bind(session);
} else {
try {
sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
@@ -800,9 +799,9 @@ void Exchange::checkAssert(qpid::client:
if (result.getNotFound()) {
throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
} else {
- if (typeSpecified && result.getType() != type) {
+ if (specifiedType.size() && result.getType() != specifiedType) {
throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
- % name % type % result.getType()).str());
+ % name % specifiedType % result.getType()).str());
}
if (durable && !result.getDurable()) {
throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
@@ -819,31 +818,79 @@ void Exchange::checkAssert(qpid::client:
% i->first % name % *(i->second) % *v).str());
}
}
+ nodeBindings.check(session);
}
}
}
-void Exchange::configure(const Address& address)
+Binding::Binding(const Variant::Map& b) :
+ exchange((Opt(b)/EXCHANGE).str()),
+ queue((Opt(b)/QUEUE).str()),
+ key((Opt(b)/KEY).str())
{
- const Variant& v = address.getOption(NODE_PROPERTIES);
- if (!v.isVoid()) {
- Variant::Map nodeProps = v.asMap();
- durable = nodeProps[DURABLE];
- Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES);
- if (x != nodeProps.end()) {
- const Variant::Map& xProps = x->second.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
- else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; }
- else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, arguments);
+ (Opt(b)/ARGUMENTS).collect(arguments);
+}
+
+Binding::Binding(const std::string& e, const std::string& q, const std::string& k) : exchange(e), queue(q), key(k) {}
+
+
+void Bindings::add(const Variant::List& list)
+{
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ push_back(Binding(i->asMap()));
+ }
+}
+
+void Bindings::setDefaultExchange(const std::string& exchange)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->exchange.empty()) i->exchange = exchange;
+ }
+}
+
+void Bindings::setDefaultQueue(const std::string& queue)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->queue.empty()) i->queue = queue;
+ }
+}
+
+void Bindings::bind(qpid::client::AsyncSession& session)
+{
+ try {
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeBind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key,
+ arg::arguments=i->arguments);
}
+ session.sync();
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str());
}
}
+void Bindings::unbind(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeUnbind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ }
+}
+
+void Bindings::check(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
+ throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ % i->exchange % i->queue % i->key).str());
+ }
+ }
+}
bool Node::enabled(const Variant& policy, CheckMode mode)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp Wed Mar 24 18:35:11 2010
@@ -114,7 +114,7 @@ void Address::setOptions(const Variant::
namespace{
const Variant EMPTY_VARIANT;
const std::string EMPTY_STRING;
-const std::string NODE_PROPERTIES="node-properties";
+const std::string NODE_PROPERTIES="node";
}
const Variant& find(const Variant::Map& map, const std::string& key)
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=927144&r1=927143&r2=927144&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed Mar 24 18:35:11 2010
@@ -564,13 +564,13 @@ struct QueueCreatePolicyFixture : public
QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
{
- QueueCreatePolicyFixture fix("#; {create:always, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:always, node:{type:queue}}");
fix.test();
}
QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
{
- QueueCreatePolicyFixture fix("#; {create:receiver, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:receiver, node:{type:queue}}");
Receiver r = fix.session.createReceiver(fix.address);
fix.test();
r.close();
@@ -578,7 +578,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyQueu
QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
{
- QueueCreatePolicyFixture fix("#; {create:sender, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:sender, node:{type:queue}}");
Sender s = fix.session.createSender(fix.address);
fix.test();
s.close();
@@ -608,14 +608,14 @@ struct ExchangeCreatePolicyFixture : pub
QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
{
- ExchangeCreatePolicyFixture fix("#; {create:always, node-properties:{type:topic}}",
+ ExchangeCreatePolicyFixture fix("#; {create:always, node:{type:topic}}",
"topic");
fix.test();
}
QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
{
- ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node-properties:{type:topic, x-properties:{type:fanout}}}", "fanout");
+ ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node:{type:topic, x-declare:{type:fanout}}}", "fanout");
Receiver r = fix.session.createReceiver(fix.address);
fix.test();
r.close();
@@ -623,7 +623,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyTopi
QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
{
- ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node-properties:{type:topic, x-properties:{type:direct}}}", "direct");
+ ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node:{type:topic, x-declare:{type:direct}}}", "direct");
Sender s = fix.session.createSender(fix.address);
fix.test();
s.close();
@@ -746,18 +746,18 @@ QPID_AUTO_TEST_CASE(testDeletePolicyExch
QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
{
MessagingFixture fix;
- std::string a1 = "q; {create:always, assert:always, node-properties:{type:queue, durable:false, x-properties:{qpid.max-count:100}}}";
+ std::string a1 = "q; {create:always, assert:always, node:{type:queue, durable:false, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s1 = fix.session.createSender(a1);
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
- std::string a2 = "q; {assert:receiver, node-properties:{durable:true, x-properties:{qpid.max-count:100}}}";
+ std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
- std::string a3 = "q; {assert:sender, node-properties:{x-properties:{qpid.max-count:99}}}";
+ std::string a3 = "q; {assert:sender, node:{x-declare:{arguments:{qpid.max-count:99}}}}";
BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
Receiver r3 = fix.session.createReceiver(a3);
r3.close();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org