You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/11/20 00:07:37 UTC
svn commit: r882349 -
/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
Author: gsim
Date: Thu Nov 19 23:07:37 2009
New Revision: 882349
URL: http://svn.apache.org/viewvc?rev=882349&view=rev
Log:
QPID-664: Refactored address resolution; ensure type is asserted on when required; moved exclusive option for subscribe.
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=882349&r1=882348&r2=882349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Thu Nov 19 23:07:37 2009
@@ -60,6 +60,7 @@
namespace{
+const Variant EMPTY_VARIANT;
const FieldTable EMPTY_FIELD_TABLE;
const std::string EMPTY_STRING;
@@ -160,6 +161,7 @@
private:
std::string type;
+ bool typeSpecified;
bool durable;
bool autoDelete;
std::string alternateExchange;
@@ -269,50 +271,42 @@
return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
-std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
- const Address& address)
+std::string checkAddressType(qpid::client::Session session, const Address& address)
{
- ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
- if (result.getQueueNotFound() && result.getExchangeNotFound()) {
- //neither a queue nor an exchange exists with that name
- if (address.getType() == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSource> source(new Subscription(address));
- QPID_LOG(debug, "treating source address as topic: " << address);
- return source;
- } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
- std::auto_ptr<MessageSource> source(new QueueSource(address));
- QPID_LOG(debug, "treating source address as queue: " << address);
- return source;
+ std::string type = address.getType();
+ if (type.empty()) {
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name; treat it as a queue
+ type = QUEUE_ADDRESS;
+ } else if (result.getExchangeNotFound()) {
+ //name refers to a queue
+ type = QUEUE_ADDRESS;
+ } else if (result.getQueueNotFound()) {
+ //name refers to an exchange
+ type = TOPIC_ADDRESS;
} else {
- throw InvalidAddress("Unrecognised type: " + address.getType());
+ //both a queue and exchange exist for that name
+ throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
}
- } else if (result.getQueueNotFound()) {
- //only an exchange exists with that name
- qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
- std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
- QPID_LOG(debug, "resolved source address as topic: " << address);
+ }
+ return type;
+}
+
+std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
+ const Address& address)
+{
+ std::string type = checkAddressType(session, address);
+ if (type == TOPIC_ADDRESS) {
+ std::auto_ptr<MessageSource> source(new Subscription(address));
+ QPID_LOG(debug, "treating source address as topic: " << address);
return source;
- } else if (result.getExchangeNotFound()) {
- //only an queue exists with that name
+ } else if (type == QUEUE_ADDRESS) {
std::auto_ptr<MessageSource> source(new QueueSource(address));
- QPID_LOG(debug, "resolved source address as queue: " << address);
+ QPID_LOG(debug, "treating source address as queue: " << address);
return source;
} else {
- //both a queue and exchange exist for that name
- if (address.getType() == TOPIC_ADDRESS) {
- qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
- std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
- QPID_LOG(debug, "resolved source address as topic: " << address);
- return source;
- } else if (address.getType() == QUEUE_ADDRESS) {
- std::auto_ptr<MessageSource> source(new QueueSource(address));
- QPID_LOG(debug, "resolved source address as queue: " << address);
- return source;
- } else if (address.getType().empty()) {
- throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
- } else {
- throw InvalidAddress("Unrecognised type: " + address.getType());
- }
+ throw InvalidAddress("Unrecognised type: " + type);
}
}
@@ -320,45 +314,29 @@
std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
const qpid::messaging::Address& address)
{
- ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
- if (result.getQueueNotFound() && result.getExchangeNotFound()) {
- //neither a queue nor an exchange exists with that name
- if (address.getType() == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
- QPID_LOG(debug, "treating target address as topic: " << address);
- return sink;
- } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
- std::auto_ptr<MessageSink> sink(new QueueSink(address));
- QPID_LOG(debug, "treating target address as queue: " << address);
- return sink;
- } else {
- throw InvalidAddress("Unrecognised type: " + address.getType());
- }
- } else if (result.getQueueNotFound()) {
- //only an exchange exists with that name
+ std::string type = checkAddressType(session, address);
+ if (type == TOPIC_ADDRESS) {
std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
- QPID_LOG(debug, "resolved target address as topic: " << address);
+ QPID_LOG(debug, "treating target address as topic: " << address);
return sink;
- } else if (result.getExchangeNotFound()) {
- //only an queue exists with that name
+ } else if (type == QUEUE_ADDRESS) {
std::auto_ptr<MessageSink> sink(new QueueSink(address));
- QPID_LOG(debug, "resolved target address as queue: " << address);
+ QPID_LOG(debug, "treating target address as queue: " << address);
return sink;
} else {
- //both a queue and exchange exist for that name
- if (address.getType() == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
- QPID_LOG(debug, "resolved target address as topic: " << address);
- return sink;
- } else if (address.getType() == QUEUE_ADDRESS) {
- std::auto_ptr<MessageSink> sink(new QueueSink(address));
- QPID_LOG(debug, "resolved target address as queue: " << address);
- return sink;
- } else if (address.getType().empty()) {
- throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
- } else {
- throw InvalidAddress("Unrecognised type: " + address.getType());
- }
+ throw InvalidAddress("Unrecognised type: " + type);
+ }
+}
+
+const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0)
+{
+ Variant::Map::const_iterator i = options.find(keys[index]);
+ if (i == options.end()) {
+ return EMPTY_VARIANT;
+ } else if (index+1 < keys.size()) {
+ return getNestedOption(i->second.asMap(), keys, index+1);
+ } else {
+ return i->second;
}
}
@@ -366,7 +344,7 @@
Queue(address),
acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
- exclusive(address.getOption(EXCLUSIVE).asBool())
+ exclusive(getNestedOption(address.getOptions(), list_of<std::string>(X_PROPERTIES)(EXCLUSIVE)).asBool())
{
//extract subscription arguments from address options
convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options);
@@ -713,6 +691,7 @@
Exchange::Exchange(const Address& a) : Node(a),
type(TOPIC_EXCHANGE),
+ typeSpecified(false),
durable(false),
autoDelete(false)
{
@@ -735,8 +714,10 @@
} else {
try {
sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {
+ throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str());
+ } catch (const std::exception& e) {
+ throw InvalidAddress(e.what());
}
}
}
@@ -756,11 +737,11 @@
void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(assertPolicy, mode)) {
- ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name);
+ ExchangeQueryResult result = sync(session).exchangeQuery(name);
if (result.getNotFound()) {
throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
} else {
- if (!type.empty() && result.getType() != type) {
+ if (typeSpecified && result.getType() != type) {
throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
% name % type % result.getType()).str());
}
@@ -795,7 +776,7 @@
Variant::Map passthrough;
for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
- else if (i->first == xamqp::EXCHANGE_TYPE) type = i->second.asString();
+ else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; }
else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
else passthrough[i->first] = i->second;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org