You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/11/11 14:15:46 UTC
svn commit: r834869 [1/2] - in /qpid/trunk/qpid/cpp: examples/messaging/
include/qpid/messaging/ src/ src/qpid/client/amqp0_10/ src/qpid/messaging/
src/tests/
Author: gsim
Date: Wed Nov 11 13:15:44 2009
New Revision: 834869
URL: http://svn.apache.org/viewvc?rev=834869&view=rev
Log:
Added support for address parsing, create/assert/delete policies
Added:
qpid/trunk/qpid/cpp/src/tests/Address.cpp
Removed:
qpid/trunk/qpid/cpp/include/qpid/messaging/Filter.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Filter.cpp
Modified:
qpid/trunk/qpid/cpp/examples/messaging/client.cpp
qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp
qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp
qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/examples/messaging/client.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/client.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/client.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/client.cpp Wed Nov 11 13:15:44 2009
@@ -46,7 +46,7 @@
Sender sender = session.createSender("service_queue");
//create temp queue & receiver...
- Address responseQueue = session.createTempQueue();
+ Address responseQueue("#response-queue {create:always, type:queue, node-properties:{x-amqp0-10-auto-delete:true}}");
Receiver receiver = session.createReceiver(responseQueue);
// Now send some messages ...
Modified: qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp Wed Nov 11 13:15:44 2009
@@ -20,11 +20,11 @@
*/
#include <qpid/messaging/Connection.h>
-#include <qpid/messaging/Filter.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/MessageListener.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Variant.h>
#include <cstdlib>
#include <iostream>
@@ -57,15 +57,14 @@
}
int main(int argc, char** argv) {
- const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
- const char* pattern = argc>2 ? argv[2] : "#.#";
+ const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ const std::string pattern = argc>2 ? argv[2] : "#.#";
try {
Connection connection = Connection::open(url);
Session session = connection.newSession();
- Filter filter(Filter::WILDCARD, pattern, "control");
- Receiver receiver = session.createReceiver("news_service", filter);
+ Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}");
Listener listener(receiver);
receiver.setListener(&listener);
receiver.setCapacity(1);
@@ -78,5 +77,3 @@
}
return 1;
}
-
-
Modified: qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp Wed Nov 11 13:15:44 2009
@@ -19,32 +19,25 @@
*
*/
-#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
-#include <qpid/messaging/Filter.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Variant.h>
#include <cstdlib>
#include <iostream>
-#include <sstream>
-
using namespace qpid::messaging;
-using std::stringstream;
-using std::string;
-
int main(int argc, char** argv) {
- const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
- const char* pattern = argc>2 ? argv[2] : "#.#";
+ const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ const std::string pattern = argc>2 ? argv[2] : "#.#";
try {
Connection connection = Connection::open(url);
Session session = connection.newSession();
- Filter filter(Filter::WILDCARD, pattern, "control");
- Receiver receiver = session.createReceiver(Address("news_service", "topic"), filter);
+ Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}");
while (true) {
Message message = receiver.fetch();
std::cout << "Message: " << message.getContent() << std::endl;
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h Wed Nov 11 13:15:44 2009
@@ -22,33 +22,71 @@
*
*/
#include <string>
+#include "qpid/Exception.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/client/ClientImportExport.h"
#include <ostream>
namespace qpid {
-namespace client {
-}
-
namespace messaging {
+struct InvalidAddress : public qpid::Exception
+{
+ InvalidAddress(const std::string& msg);
+};
+
+struct MalformedAddress : public qpid::Exception
+{
+ MalformedAddress(const std::string& msg);
+};
+
+class AddressImpl;
+
/**
* Represents an address to which messages can be sent and from which
* messages can be received. Often a simple name is sufficient for
- * this. However this struct allows the type of address to be
- * specified allowing more sophisticated treatment if necessary.
+ * this, however this can be augmented with a subject pattern and
+ * options.
+ *
+ * All parts of an address can be specified in a string of the
+ * following form:
+ *
+ * <address> [ / <subject> ] [ { <key> : <value> , ... } ]
+ *
+ * Here the <address> is a simple name for the addressed entity and
+ * <subject> is a subject or subject pattern for messages sent to or
+ * received from this address. The options are specified as a series
+ * of key value pairs enclosed in curly brackets (denoting a map).
*/
-struct Address
+class Address
{
- std::string value;
- std::string type;
-
+ public:
QPID_CLIENT_EXTERN Address();
QPID_CLIENT_EXTERN Address(const std::string& address);
- QPID_CLIENT_EXTERN Address(const std::string& address, const std::string& type);
- QPID_CLIENT_EXTERN operator const std::string&() const;
- QPID_CLIENT_EXTERN const std::string& toStr() const;
+ QPID_CLIENT_EXTERN Address(const std::string& name, const std::string& subject,
+ const Variant::Map& options, const std::string& type = "");
+ QPID_CLIENT_EXTERN Address(const Address& address);
+ QPID_CLIENT_EXTERN ~Address();
+ Address& operator=(const Address&);
+ QPID_CLIENT_EXTERN const std::string& getName() const;
+ QPID_CLIENT_EXTERN void setName(const std::string&);
+ QPID_CLIENT_EXTERN const std::string& getSubject() const;
+ QPID_CLIENT_EXTERN void setSubject(const std::string&);
+ QPID_CLIENT_EXTERN bool hasSubject() const;
+ QPID_CLIENT_EXTERN const Variant::Map& getOptions() const;
+ QPID_CLIENT_EXTERN Variant::Map& getOptions();
+ QPID_CLIENT_EXTERN void setOptions(const Variant::Map&);
+
+ QPID_CLIENT_EXTERN std::string getType() const;
+ QPID_CLIENT_EXTERN void setType(const std::string&);
+
+ QPID_CLIENT_EXTERN const Variant& getOption(const std::string& key) const;
+
+ QPID_CLIENT_EXTERN std::string toStr() const;
QPID_CLIENT_EXTERN operator bool() const;
QPID_CLIENT_EXTERN bool operator !() const;
+ private:
+ AddressImpl* impl;
};
QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Address& address);
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h Wed Nov 11 13:15:44 2009
@@ -32,7 +32,7 @@
namespace messaging {
-struct Address;
+class Address;
class Codec;
struct MessageImpl;
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Wed Nov 11 13:15:44 2009
@@ -24,7 +24,7 @@
#include "qpid/client/ClientImportExport.h"
#include "qpid/client/Handle.h"
#include "qpid/sys/Time.h"
-#include "Variant.h"
+#include <string>
namespace qpid {
namespace client {
@@ -35,8 +35,7 @@
namespace messaging {
-struct Address;
-struct Filter;
+class Address;
class Message;
class MessageListener;
class Sender;
@@ -90,13 +89,10 @@
QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
- QPID_CLIENT_EXTERN Sender createSender(const Address& address, const VariantMap& options = VariantMap());
- QPID_CLIENT_EXTERN Sender createSender(const std::string& address, const VariantMap& options = VariantMap());
-
- QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const VariantMap& options = VariantMap());
- QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options = VariantMap());
- QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const VariantMap& options = VariantMap());
- QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap());
+ QPID_CLIENT_EXTERN Sender createSender(const Address& address);
+ QPID_CLIENT_EXTERN Sender createSender(const std::string& address);
+ QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address);
+ QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address);
QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string());
private:
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h Wed Nov 11 13:15:44 2009
@@ -30,9 +30,6 @@
#include "qpid/client/ClientImportExport.h"
namespace qpid {
-namespace client {
-}
-
namespace messaging {
/**
@@ -93,6 +90,7 @@
QPID_CLIENT_EXTERN ~Variant();
QPID_CLIENT_EXTERN VariantType getType() const;
+ QPID_CLIENT_EXTERN bool isVoid() const;
QPID_CLIENT_EXTERN Variant& operator=(bool);
QPID_CLIENT_EXTERN Variant& operator=(uint8_t);
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Nov 11 13:15:44 2009
@@ -594,7 +594,6 @@
qpid/messaging/Address.cpp
qpid/messaging/Connection.cpp
qpid/messaging/ConnectionImpl.h
- qpid/messaging/Filter.cpp
qpid/messaging/ListContent.cpp
qpid/messaging/ListView.cpp
qpid/messaging/MapContent.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Nov 11 13:15:44 2009
@@ -686,7 +686,6 @@
qpid/client/SubscriptionManagerImpl.h \
qpid/messaging/Address.cpp \
qpid/messaging/Connection.cpp \
- qpid/messaging/Filter.cpp \
qpid/messaging/ListContent.cpp \
qpid/messaging/ListView.cpp \
qpid/messaging/MapContent.cpp \
@@ -795,7 +794,6 @@
../include/qpid/messaging/Address.h \
../include/qpid/messaging/Connection.h \
../include/qpid/messaging/Codec.h \
- ../include/qpid/messaging/Filter.h \
../include/qpid/messaging/ListContent.h \
../include/qpid/messaging/ListView.h \
../include/qpid/messaging/MapContent.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Wed Nov 11 13:15:44 2009
@@ -20,18 +20,24 @@
*/
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
#include "qpid/client/amqp0_10/MessageSource.h"
#include "qpid/client/amqp0_10/MessageSink.h"
#include "qpid/client/amqp0_10/OutgoingMessage.h"
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/ExchangeQueryResult.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/QueueQueryResult.h"
#include "qpid/framing/ReplyTo.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
namespace qpid {
namespace client {
@@ -40,61 +46,145 @@
using qpid::Exception;
using qpid::messaging::Address;
using qpid::messaging::Filter;
+using qpid::messaging::InvalidAddress;
using qpid::messaging::Variant;
+using qpid::framing::ExchangeQueryResult;
using qpid::framing::FieldTable;
+using qpid::framing::QueueQueryResult;
using qpid::framing::ReplyTo;
+using qpid::framing::Uuid;
using namespace qpid::framing::message;
+using namespace boost::assign;
namespace{
-const Variant EMPTY_VARIANT;
const FieldTable EMPTY_FIELD_TABLE;
const std::string EMPTY_STRING;
//option names
const std::string BROWSE("browse");
const std::string EXCLUSIVE("exclusive");
-const std::string MODE("mode");
-const std::string NAME("name");
-const std::string UNACKNOWLEDGED("unacknowledged");
+const std::string NO_LOCAL("no-local");
+const std::string FILTER("filter");
+const std::string RELIABILITY("reliability");
+const std::string NAME("subscription-name");
+const std::string NODE_PROPERTIES("node-properties");
+
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
const std::string QUEUE_ADDRESS("queue");
const std::string TOPIC_ADDRESS("topic");
-const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+");
-const std::string DIVIDER("/");
-const std::string SIMPLE_SUBSCRIPTION("simple");
-const std::string RELIABLE_SUBSCRIPTION("reliable");
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
const std::string DURABLE_SUBSCRIPTION("durable");
+const std::string DURABLE("durable");
+
+const std::string TOPIC_EXCHANGE("topic");
+const std::string FANOUT_EXCHANGE("fanout");
+const std::string DIRECT_EXCHANGE("direct");
+const std::string HEADERS_EXCHANGE("headers");
+const std::string XML_EXCHANGE("xml");
+const std::string WILDCARD_ANY("*");
+}
+
+//some amqp 0-10 specific options
+namespace xamqp{
+const std::string AUTO_DELETE("x-amqp0-10-auto-delete");
+const std::string EXCHANGE_TYPE("x-amqp0-10-exchange-type");
+const std::string EXCLUSIVE("x-amqp0-10-exclusive");
+const std::string ALTERNATE_EXCHANGE("x-amqp0-10-alternate-exchange");
+const std::string ARGUMENTS("x-amqp0-10-arguments");
+const std::string QUEUE_ARGUMENTS("x-amqp0-10-queue-arguments");
+const std::string SUBSCRIBE_ARGUMENTS("x-amqp0-10-queue-arguments");
}
-class QueueSource : public MessageSource
+class Node
+{
+ protected:
+ enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+
+ Node(const Address& address);
+
+ const std::string name;
+ Variant createPolicy;
+ Variant assertPolicy;
+ Variant deletePolicy;
+
+ static bool enabled(const Variant& policy, CheckMode mode);
+ static bool createEnabled(const Address& address, CheckMode mode);
+ static void convert(const Variant& option, FieldTable& arguments);
+ static std::vector<std::string> RECEIVER_MODES;
+ static std::vector<std::string> SENDER_MODES;
+};
+
+class Queue : protected Node
{
public:
- QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED,
- bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ Queue(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ private:
+ bool durable;
+ bool autoDelete;
+ bool exclusive;
+ std::string alternateExchange;
+ FieldTable arguments;
+
+ void configure(const Address&);
+};
+
+class Exchange : protected Node
+{
+ public:
+ Exchange(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ const std::string& getDesiredExchangeType() { return type; }
+
+ private:
+ std::string type;
+ bool durable;
+ bool autoDelete;
+ std::string alternateExchange;
+ FieldTable arguments;
+
+ void configure(const Address&);
+};
+
+class QueueSource : public Queue, public MessageSource
+{
+ public:
+ QueueSource(const Address& address);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
- const std::string name;
const AcceptMode acceptMode;
const AcquireMode acquireMode;
const bool exclusive;
- const FieldTable options;
+ FieldTable options;
};
-class Subscription : public MessageSource
+class Subscription : public Exchange, public MessageSource
{
public:
- enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE};
-
- Subscription(const std::string& name, SubscriptionMode mode = SIMPLE,
- const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE);
- void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+ Subscription(const Address&, const std::string& exchangeType);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
-
- static SubscriptionMode getMode(const std::string& mode);
private:
struct Binding
{
@@ -107,155 +197,138 @@
typedef std::vector<Binding> Bindings;
- const std::string name;
- const bool autoDelete;
+ const std::string queue;
+ const bool reliable;
const bool durable;
- const FieldTable queueOptions;
- const FieldTable subscriptionOptions;
+ FieldTable queueOptions;
+ FieldTable subscriptionOptions;
Bindings bindings;
- std::string queue;
+
+ void bindSpecial(const std::string& exchangeType);
+ void bind(const Variant& filter);
+ void bind(const Variant::Map& filter);
+ void bind(const Variant::List& filter);
+ void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+ static std::string getSubscriptionName(const std::string& base, const Variant& name);
};
-class Exchange : public MessageSink
+class ExchangeSink : public Exchange, public MessageSink
{
public:
- Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING,
- bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false,
- const FieldTable& options = EMPTY_FIELD_TABLE);
+ ExchangeSink(const Address& name);
void declare(qpid::client::AsyncSession& session, const std::string& name);
void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
- const std::string name;
const std::string defaultSubject;
- const bool passive;
- const std::string type;
- const bool durable;
- const FieldTable options;
};
-class QueueSink : public MessageSink
+class QueueSink : public Queue, public MessageSink
{
public:
- QueueSink(const std::string& name, bool passive=true, bool exclusive=false,
- bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ QueueSink(const Address& name);
void declare(qpid::client::AsyncSession& session, const std::string& name);
void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
- const std::string name;
- const bool passive;
- const bool exclusive;
- const bool autoDelete;
- const bool durable;
- const FieldTable options;
};
bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
-bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address);
+
+bool in(const Variant& value, const std::vector<std::string>& choices)
+{
+ if (!value.isVoid()) {
+ for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+ if (value.asString() == *i) return true;
+ }
+ }
+ return false;
+}
+
+bool getReceiverPolicy(const Address& address, const std::string& key)
+{
+ return in(address.getOption(key), list_of<std::string>(ALWAYS)(RECEIVER));
+}
+
+bool getSenderPolicy(const Address& address, const std::string& key)
+{
+ return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
+}
-const Variant& getOption(const std::string& key, const Variant::Map& options)
+bool is_unreliable(const Address& address)
{
- Variant::Map::const_iterator i = options.find(key);
- if (i == options.end()) return EMPTY_VARIANT;
- else return i->second;
+ return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+}
+
+bool is_reliable(const Address& address)
+{
+ return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
- const Address& address,
- const Filter* filter,
- const Variant::Map& options)
+ const Address& address)
{
//TODO: handle case where there exists a queue and an exchange of
//the same name (hence an unqualified address is ambiguous)
//TODO: make sure specified address type gives sane error message
- //if it does npt match the configuration on server
+ //if it does not match the configuration on server
- if (isQueue(session, address)) {
- //TODO: support auto-created queue as source, if requested by specific option
-
- AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
- AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED;
- bool exclusive = getOption(EXCLUSIVE, options).asBool();
- FieldTable arguments;
- //TODO: extract subscribe arguments from options (e.g. either
- //filter out already processed keys and send the rest, or have
- //a nested map)
+ /**
+ if (Node::createEnabled(address, FOR_RECEIVER)) {
+ } else {
+ }
+ **/
- std::auto_ptr<MessageSource> source =
- std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments));
+ if (isQueue(session, address)) {
+ std::auto_ptr<MessageSource> source(new QueueSource(address));
+ QPID_LOG(debug, "resolved source address as queue: " << address);
return source;
} else {
- //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important)
- std::auto_ptr<Subscription> bindings =
- std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(),
- Subscription::getMode(getOption(MODE, options).asString())));
-
- qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value);
- if (result.getNotFound()) {
- throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
- } else if (result.getType() == "topic") {
- if (filter) {
- if (filter->type != Filter::WILDCARD) {
- throw qpid::framing::NotImplementedException(
- QPID_MSG("Filters of type " << filter->type << " not supported by address " << address));
-
- }
- for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) {
- bindings->add(address.value, *i, qpid::framing::FieldTable());
- }
- } else {
- //default is to receive all messages
- bindings->add(address.value, "*", qpid::framing::FieldTable());
- }
- } else if (result.getType() == "fanout") {
- if (filter) {
- throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address));
- }
- bindings->add(address.value, address.value, qpid::framing::FieldTable());
- } else if (result.getType() == "direct") {
- //TODO: ????
- } else {
- //TODO: xml and headers exchanges
- throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address));
- }
- std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release());
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
+ std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
+ QPID_LOG(debug, "resolved source address as topic: " << address);
return source;
}
}
std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
- const qpid::messaging::Address& address,
- const qpid::messaging::Variant::Map& /*options*/)
+ const qpid::messaging::Address& address)
{
std::auto_ptr<MessageSink> sink;
if (isQueue(session, address)) {
- //TODO: support for auto-created queues as sink
- sink = std::auto_ptr<MessageSink>(new QueueSink(address.value));
+ sink = std::auto_ptr<MessageSink>(new QueueSink(address));
} else {
- std::string subject;
- if (isTopic(session, address, subject)) {
- //TODO: support for auto-created exchanges as sink
- sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject));
+ if (isTopic(session, address)) {
+ sink = std::auto_ptr<MessageSink>(new ExchangeSink(address));
} else {
- if (address.type.empty()) {
- throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+ if (address.getType().empty()) {
+ throw InvalidAddress(QPID_MSG("Address not known: " << address));
} else {
- throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type));
+ throw InvalidAddress(QPID_MSG("Address type not recognised: " << address.getType()));
}
}
}
return sink;
}
-QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) :
- name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {}
+QueueSource::QueueSource(const Address& address) :
+ Queue(address),
+ acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+ acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
+ exclusive(address.getOption(EXCLUSIVE).asBool())
+{
+ //extract subscription arguments from address options
+ convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options);
+}
void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
@@ -267,11 +340,48 @@
void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
session.messageCancel(destination);
+ checkDelete(session, FOR_RECEIVER);
+}
+
+std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name)
+{
+ if (name.isVoid()) {
+ return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
+ } else {
+ return (boost::format("%1%_%2%") % base % name.asString()).str();
+ }
}
-Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions)
- : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE),
- queueOptions(qOptions), subscriptionOptions(sOptions) {}
+Subscription::Subscription(const Address& address, const std::string& exchangeType)
+ : Exchange(address),
+ queue(getSubscriptionName(name, address.getOption(NAME))),
+ reliable(is_reliable(address)),
+ durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
+{
+ if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
+ convert(address.getOption(xamqp::QUEUE_ARGUMENTS), queueOptions);
+ convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), subscriptionOptions);
+
+ const Variant& filter = address.getOption(FILTER);
+ if (!filter.isVoid()) {
+ //TODO: if both subject _and_ filter are specified,
+ //combine in some way; for now we just ignore the
+ //subject in that case.
+ bind(filter);
+ } else if (address.hasSubject()) {
+ //Note: This will not work for headers- or xml- exchange;
+ //fanout exchange will do no filtering.
+ //TODO: for headers- or xml- exchange can construct a match
+ //for the subject in the application-headers
+ bind(address.getSubject());
+ } else {
+ //Neither a subject nor a filter has been defined, treat this
+ //as wanting to match all messages (Note: direct exchange is
+ //currently unable to support this case).
+ if (!exchangeType.empty()) bindSpecial(exchangeType);
+ else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType());
+ }
+}
void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
{
@@ -280,18 +390,19 @@
void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
- if (name.empty()) {
- //TODO: use same scheme as JMS client for subscription queue name generation?
- queue = session.getId().getName() + destination;
- } else {
- queue = name;
- }
+ //create exchange if required and specified by policy:
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
+
+ //create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=true,
- arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions);
+ arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
+ //bind subscription queue to exchange:
for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
}
- AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+ //subscribe to subscription queue:
+ AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
session.messageSubscribe(arg::queue=queue, arg::destination=destination,
arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
@@ -300,36 +411,23 @@
{
session.messageCancel(destination);
session.queueDelete(arg::queue=queue);
+ checkDelete(session, FOR_RECEIVER);
}
Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
exchange(e), key(k), options(o) {}
-Subscription::SubscriptionMode Subscription::getMode(const std::string& s)
-{
- if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE;
- else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE;
- else if (s == DURABLE_SUBSCRIPTION) return DURABLE;
- else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s));
-}
-
void convert(qpid::messaging::Message& from, qpid::client::Message& to);
-Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject,
- bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) :
- name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {}
-
-void Exchange::declare(qpid::client::AsyncSession& session, const std::string&)
-{
- //TODO: should this really by synchronous? want to get error if not valid...
- if (passive) {
- sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
- } else {
- sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options);
- }
+ExchangeSink::ExchangeSink(const Address& address) : Exchange(address), defaultSubject(address.getSubject()) {}
+
+void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
}
-void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
+void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
@@ -337,22 +435,17 @@
m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
}
-void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
+void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkDelete(session, FOR_SENDER);
+}
-QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive,
- bool _autoDelete, bool _durable, const FieldTable& _options) :
- name(_name), passive(_passive), exclusive(_exclusive),
- autoDelete(_autoDelete), durable(_durable), options(_options) {}
+QueueSink::QueueSink(const Address& address) : Queue(address) {}
void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
- //TODO: should this really by synchronous?
- if (passive) {
- sync(session).queueDeclare(arg::queue=name, arg::passive=true);
- } else {
- sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable,
- arg::autoDelete=autoDelete, arg::arguments=options);
- }
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
}
void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
@@ -360,9 +453,10 @@
m.status = session.messageTransfer(arg::content=m.message);
}
-void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
-
-void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
+void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkDelete(session, FOR_SENDER);
+}
void convert(qpid::messaging::Message& from, qpid::client::Message& to)
{
@@ -372,7 +466,7 @@
//TODO: set other delivery properties
to.getMessageProperties().setContentType(from.getContentType());
const Address& address = from.getReplyTo();
- if (!address.value.empty()) {
+ if (!address.getName().empty()) {
to.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders());
@@ -381,72 +475,292 @@
Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
{
- if (rt.getExchange().empty()) {
- if (rt.getRoutingKey().empty()) {
- return Address();//empty address
- } else {
- return Address(rt.getRoutingKey(), QUEUE_ADDRESS);
- }
+ Address address;
+ if (rt.getExchange().empty()) {//if default exchange, treat as queue
+ address.setName(rt.getRoutingKey());
+ address.setType(QUEUE_ADDRESS);
} else {
- if (rt.getRoutingKey().empty()) {
- return Address(rt.getExchange(), TOPIC_ADDRESS);
- } else {
- return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT);
- }
- }
+ address.setName(rt.getExchange());
+ address.setSubject(rt.getRoutingKey());
+ address.setType(TOPIC_ADDRESS);
+ }
+ return address;
}
qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
{
- if (address.type == QUEUE_ADDRESS || address.type.empty()) {
- return ReplyTo(EMPTY_STRING, address.value);
- } else if (address.type == TOPIC_ADDRESS) {
- return ReplyTo(address.value, EMPTY_STRING);
- } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
- //need to split the value
- string::size_type i = address.value.find(DIVIDER);
- if (i != string::npos) {
- std::string exchange = address.value.substr(0, i);
- std::string routingKey;
- if (i+1 < address.value.size()) {
- routingKey = address.value.substr(i+1);
- }
- return ReplyTo(exchange, routingKey);
- } else {
- return ReplyTo(address.value, EMPTY_STRING);
- }
+ if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
+ return ReplyTo(EMPTY_STRING, address.getName());
+ } else if (address.getType() == TOPIC_ADDRESS) {
+ return ReplyTo(address.getName(), address.getSubject());
} else {
- QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type);
- //treat as queue
- return ReplyTo(EMPTY_STRING, address.value);
+ QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType());
+ return ReplyTo(EMPTY_STRING, address.getName());//treat as queue
}
}
bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
{
- return address.type == QUEUE_ADDRESS ||
- (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value);
+ return address.getType() == QUEUE_ADDRESS ||
+ (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
}
-bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject)
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address)
{
- if (address.type.empty()) {
- return !session.exchangeQuery(address.value).getNotFound();
- } else if (address.type == TOPIC_ADDRESS) {
- return true;
- } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
- string::size_type i = address.value.find(DIVIDER);
- if (i != string::npos) {
- std::string exchange = address.value.substr(0, i);
- if (i+1 < address.value.size()) {
- subject = address.value.substr(i+1);
- }
- }
+ if (address.getType().empty()) {
+ return !session.exchangeQuery(address.getName()).getNotFound();
+ } else if (address.getType() == TOPIC_ADDRESS) {
return true;
} else {
return false;
}
}
+void Subscription::bind(const Variant& filter)
+{
+ switch (filter.getType()) {
+ case qpid::messaging::VAR_MAP:
+ bind(filter.asMap());
+ break;
+ case qpid::messaging::VAR_LIST:
+ bind(filter.asList());
+ break;
+ default:
+ add(name, filter.asString());
+ break;
+ }
+}
+
+void Subscription::bind(const Variant::Map& filter)
+{
+ qpid::framing::FieldTable arguments;
+ translate(filter, arguments);
+ add(name, queue, arguments);
+}
+
+void Subscription::bind(const Variant::List& filter)
+{
+ for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) {
+ bind(*i);
+ }
+}
+
+void Subscription::bindSpecial(const std::string& exchangeType)
+{
+ if (exchangeType == TOPIC_EXCHANGE) {
+ add(name, WILDCARD_ANY);
+ } else if (exchangeType == FANOUT_EXCHANGE) {
+ add(name, queue);
+ } else if (exchangeType == HEADERS_EXCHANGE) {
+ //TODO: add special binding for headers exchange to match all messages
+ } else if (exchangeType == XML_EXCHANGE) {
+ //TODO: add special binding for xml exchange to match all messages
+ } else { //E.g. direct
+ throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType));
+ }
+}
+
+Node::Node(const Address& address) : name(address.getName()),
+ createPolicy(address.getOption(CREATE)),
+ assertPolicy(address.getOption(ASSERT)),
+ deletePolicy(address.getOption(DELETE)) {}
+
+Queue::Queue(const Address& a) : Node(a),
+ durable(false),
+ autoDelete(false),
+ exclusive(false)
+{
+ configure(a);
+}
+
+void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ QPID_LOG(debug, "Auto-creating queue '" << name << "'");
+ try {
+ sync(session).queueDeclare(arg::queue=name,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::exclusive=exclusive,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Queue %1% does not exist; %2%") % name % e.what()).str());
+ }
+ }
+}
+
+void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(deletePolicy, mode)) {
+ QPID_LOG(debug, "Auto-deleting queue '" << name << "'");
+ sync(session).queueDelete(arg::queue=name);
+ }
+}
+
+void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ QueueQueryResult result = sync(session).queueQuery(name);
+ if (result.getQueue() != name) {
+ throw InvalidAddress((boost::format("Queue not found: %1%") % name).str());
+ } else {
+ if (durable && !result.getDurable()) {
+ throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str());
+ }
+ if (autoDelete && !result.getAutoDelete()) {
+ throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str());
+ }
+ if (exclusive && !result.getExclusive()) {
+ throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str());
+ }
+ if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
+ throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ % name % alternateExchange % result.getAlternateExchange()).str());
+ }
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (*i->second != *v) {
+ throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ }
+ }
+}
+
+void Queue::configure(const Address& address)
+{
+ const Variant& properties = address.getOption(NODE_PROPERTIES);
+ if (!properties.isVoid()) {
+ Variant::Map p = properties.asMap();
+ durable = p[DURABLE];
+ autoDelete = p[xamqp::AUTO_DELETE];
+ exclusive = p[xamqp::EXCLUSIVE];
+ alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString();
+ if (!p[xamqp::ARGUMENTS].isVoid()) {
+ translate(p[xamqp::ARGUMENTS].asMap(), arguments);
+ }
+ }
+}
+
+Exchange::Exchange(const Address& a) : Node(a),
+ durable(false),
+ autoDelete(false)
+{
+ configure(a);
+}
+
+void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str());
+ }
+ }
+}
+
+void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(deletePolicy, mode)) {
+ sync(session).exchangeDelete(arg::exchange=name);
+ }
+}
+
+void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name);
+ if (result.getNotFound()) {
+ throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
+ } else {
+ if (!type.empty() && result.getType() != type) {
+ throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ % name % type % result.getType()).str());
+ }
+ if (durable && !result.getDurable()) {
+ throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
+ }
+ //Note: Can't check auto-delete or alternate-exchange via
+ //exchange-query-result as these are not returned
+ //TODO: could use a passive declare to check alternate-exchange
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (i->second != v) {
+ throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ }
+ }
+}
+
+void Exchange::configure(const Address& address)
+{
+ const Variant& properties = address.getOption(NODE_PROPERTIES);
+ if (!properties.isVoid()) {
+ Variant::Map p = properties.asMap();
+ durable = p[DURABLE];
+ autoDelete = p[xamqp::AUTO_DELETE];
+ type = p[xamqp::EXCHANGE_TYPE].asString();
+ alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString();
+ if (!p[xamqp::ARGUMENTS].isVoid()) {
+ translate(p[xamqp::ARGUMENTS].asMap(), arguments);
+ }
+ }
+}
+
+
+bool Node::enabled(const Variant& policy, CheckMode mode)
+{
+ bool result;
+ switch (mode) {
+ case FOR_RECEIVER:
+ result = in(policy, RECEIVER_MODES);
+ break;
+ case FOR_SENDER:
+ result = in(policy, SENDER_MODES);
+ break;
+ }
+ return result;
+}
+
+bool Node::createEnabled(const Address& address, CheckMode mode)
+{
+ const Variant& policy = address.getOption(CREATE);
+ return enabled(policy, mode);
+}
+
+void Node::convert(const Variant& options, FieldTable& arguments)
+{
+ if (!options.isVoid()) {
+ translate(options.asMap(), arguments);
+ }
+}
+std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
+std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
}}} // namespace qpid::client::amqp0_10
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h Wed Nov 11 13:15:44 2009
@@ -50,13 +50,10 @@
{
public:
std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session,
- const qpid::messaging::Address& address,
- const qpid::messaging::Filter* filter,
- const qpid::messaging::Variant::Map& options);
-
+ const qpid::messaging::Address& address);
+
std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session,
- const qpid::messaging::Address& address,
- const qpid::messaging::Variant::Map& options);
+ const qpid::messaging::Address& address);
static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Wed Nov 11 13:15:44 2009
@@ -50,11 +50,11 @@
qpid::sys::Mutex lock;//used to protect data structures
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
+ Sessions sessions;
qpid::client::Connection connection;
std::auto_ptr<FailoverListener> failoverListener;
qpid::Url url;
qpid::client::ConnectionSettings settings;
- Sessions sessions;
bool reconnectionEnabled;
int timeout;
int minRetryInterval;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Wed Nov 11 13:15:44 2009
@@ -39,7 +39,7 @@
message.setData(from.getContent());
message.getMessageProperties().setContentType(from.getContentType());
const Address& address = from.getReplyTo();
- if (!address.value.empty()) {
+ if (address) {
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Wed Nov 11 13:15:44 2009
@@ -103,7 +103,7 @@
session = s;
if (state == UNRESOLVED) {
- source = resolver.resolveSource(session, address, filter, options);
+ source = resolver.resolveSource(session, address);
state = STOPPED;//TODO: if session is started, go straight to started
}
if (state == CANCELLED) {
@@ -136,11 +136,9 @@
}
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
- const qpid::messaging::Address& a,
- const qpid::messaging::Filter* f,
- const qpid::messaging::Variant::Map& o) :
+ const qpid::messaging::Address& a) :
- parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF),
+ parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), listener(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Wed Nov 11 13:15:44 2009
@@ -22,7 +22,6 @@
*
*/
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/ReceiverImpl.h"
#include "qpid/messaging/Variant.h"
@@ -48,9 +47,7 @@
enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
ReceiverImpl(SessionImpl& parent, const std::string& name,
- const qpid::messaging::Address& address,
- const qpid::messaging::Filter* filter,
- const qpid::messaging::Variant::Map& options);
+ const qpid::messaging::Address& address);
void init(qpid::client::AsyncSession session, AddressResolution& resolver);
bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
@@ -72,8 +69,6 @@
SessionImpl& parent;
const std::string destination;
const qpid::messaging::Address address;
- const qpid::messaging::Filter* filter;
- const qpid::messaging::Variant::Map options;
const uint32_t byteCredit;
State state;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Wed Nov 11 13:15:44 2009
@@ -29,9 +29,8 @@
namespace amqp0_10 {
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
- const qpid::messaging::Address& _address,
- const qpid::messaging::Variant::Map& _options) :
- parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
+ const qpid::messaging::Address& _address) :
+ parent(_parent), name(_name), address(_address), state(UNRESOLVED),
capacity(50), window(0), flushed(false) {}
void SenderImpl::send(const qpid::messaging::Message& message)
@@ -63,7 +62,7 @@
{
session = s;
if (state == UNRESOLVED) {
- sink = resolver.resolveSink(session, address, options);
+ sink = resolver.resolveSink(session, address);
state = ACTIVE;
}
if (state == CANCELLED) {
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Wed Nov 11 13:15:44 2009
@@ -47,8 +47,7 @@
enum State {UNRESOLVED, ACTIVE, CANCELLED};
SenderImpl(SessionImpl& parent, const std::string& name,
- const qpid::messaging::Address& address,
- const qpid::messaging::Variant::Map& options);
+ const qpid::messaging::Address& address);
void send(const qpid::messaging::Message&);
void cancel();
void setCapacity(uint32_t);
@@ -60,7 +59,6 @@
SessionImpl& parent;
const std::string name;
const qpid::messaging::Address address;
- const qpid::messaging::Variant::Map options;
State state;
std::auto_ptr<MessageSink> sink;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Wed Nov 11 13:15:44 2009
@@ -28,7 +28,6 @@
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/messaging/MessageListener.h"
@@ -132,36 +131,22 @@
{
qpid::messaging::Receiver result;
const qpid::messaging::Address& address;
- const Filter* filter;
- const qpid::messaging::Variant::Map& options;
- CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a, const Filter* f,
- const qpid::messaging::Variant::Map& o) :
- Command(i), address(a), filter(f), options(o) {}
- void operator()() { result = impl.createReceiverImpl(address, filter, options); }
+ CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
+ Command(i), address(a) {}
+ void operator()() { result = impl.createReceiverImpl(address); }
};
-Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options)
-{
- CreateReceiver f(*this, address, 0, options);
- while (!execute(f)) {}
- return f.result;
-}
-
-Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address,
- const Filter& filter, const VariantMap& options)
-{
- CreateReceiver f(*this, address, &filter, options);
- while (!execute(f)) {}
- return f.result;
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address)
+{
+ return get1<CreateReceiver, Receiver>(address);
}
-Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address,
- const Filter* filter, const VariantMap& options)
+Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address)
{
- std::string name = address;
+ std::string name = address.getName();
getFreeKey(name, receivers);
- Receiver receiver(new ReceiverImpl(*this, name, address, filter, options));
+ Receiver receiver(new ReceiverImpl(*this, name, address));
getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
receivers[name] = receiver;
return receiver;
@@ -171,26 +156,22 @@
{
qpid::messaging::Sender result;
const qpid::messaging::Address& address;
- const qpid::messaging::Variant::Map& options;
- CreateSender(SessionImpl& i, const qpid::messaging::Address& a,
- const qpid::messaging::Variant::Map& o) :
- Command(i), address(a), options(o) {}
- void operator()() { result = impl.createSenderImpl(address, options); }
+ CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
+ Command(i), address(a) {}
+ void operator()() { result = impl.createSenderImpl(address); }
};
-Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options)
+Sender SessionImpl::createSender(const qpid::messaging::Address& address)
{
- CreateSender f(*this, address, options);
- while (!execute(f)) {}
- return f.result;
+ return get1<CreateSender, Sender>(address);
}
-Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address, const VariantMap& options)
+Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
{
- std::string name = address;
+ std::string name = address.getName();
getFreeKey(name, senders);
- Sender sender(new SenderImpl(*this, name, address, options));
+ Sender sender(new SenderImpl(*this, name, address));
getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver);
senders[name] = sender;
return sender;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Wed Nov 11 13:15:44 2009
@@ -63,13 +63,8 @@
void sync();
void flush();
qpid::messaging::Address createTempQueue(const std::string& baseName);
- qpid::messaging::Sender createSender(const qpid::messaging::Address& address,
- const qpid::messaging::VariantMap& options);
- qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address,
- const qpid::messaging::VariantMap& options);
- qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address,
- const qpid::messaging::Filter& filter,
- const qpid::messaging::VariantMap& options);
+ qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
+ qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address);
void* getLastConfirmedSent();
void* getLastConfirmedAcknowledged();
@@ -129,11 +124,8 @@
void closeImpl();
void syncImpl();
void flushImpl();
- qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address,
- const qpid::messaging::VariantMap& options);
- qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address,
- const qpid::messaging::Filter* filter,
- const qpid::messaging::VariantMap& options);
+ qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address);
+ qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address);
uint32_t availableImpl(const std::string* destination);
uint32_t pendingAckImpl(const std::string* destination);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp Wed Nov 11 13:15:44 2009
@@ -19,28 +19,293 @@
*
*/
#include "qpid/messaging/Address.h"
+#include "qpid/framing/Uuid.h"
+#include <sstream>
+#include <boost/format.hpp>
namespace qpid {
namespace messaging {
-Address::Address() {}
-Address::Address(const std::string& address) : value(address) {}
-Address::Address(const std::string& address, const std::string& t) : value(address), type(t) {}
-Address::operator const std::string&() const { return value; }
-const std::string& Address::toStr() const { return value; }
-Address::operator bool() const { return !value.empty(); }
-bool Address::operator !() const { return value.empty(); }
+namespace {
+const std::string SUBJECT_DIVIDER = "/";
+const std::string SPACE = " ";
+const std::string TYPE = "type";
+}
+class AddressImpl
+{
+ public:
+ std::string name;
+ std::string subject;
+ Variant::Map options;
+
+ AddressImpl() {}
+ AddressImpl(const std::string& n, const std::string& s, const Variant::Map& o) :
+ name(n), subject(s), options(o) {}
+};
+
+class AddressParser
+{
+ public:
+ AddressParser(const std::string&);
+ bool parse(Address& address);
+ private:
+ const std::string& input;
+ std::string::size_type current;
+ static const std::string RESERVED;
+
+ bool readChar(char c);
+ bool readQuotedString(Variant& value);
+ bool readString(Variant& value, char delimiter);
+ bool readWord(std::string& word);
+ bool readSimpleValue(Variant& word);
+ bool readKey(std::string& key);
+ bool readValue(Variant& value);
+ bool readKeyValuePair(Variant::Map& map);
+ bool readMap(Variant& value);
+ bool readList(Variant& value);
+ bool error(const std::string& message);
+ bool eos();
+ bool iswhitespace();
+ bool isreserved();
+};
+
+Address::Address() : impl(new AddressImpl()) {}
+Address::Address(const std::string& address) : impl(new AddressImpl())
+{
+ AddressParser parser(address);
+ parser.parse(*this);
+}
+Address::Address(const std::string& name, const std::string& subject, const Variant::Map& options,
+ const std::string& type)
+ : impl(new AddressImpl(name, subject, options)) { setType(type); }
+Address::Address(const Address& a) :
+ impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) {}
+Address::~Address() { delete impl; }
+
+Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; }
+
+
+std::string Address::toStr() const
+{
+ std::stringstream out;
+ out << impl->name;
+ if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject;
+ if (!impl->options.empty()) out << " {" << impl->options << "}";
+ return out.str();
+}
+Address::operator bool() const { return !impl->name.empty(); }
+bool Address::operator !() const { return impl->name.empty(); }
+
+const std::string& Address::getName() const { return impl->name; }
+void Address::setName(const std::string& name) { impl->name = name; }
+const std::string& Address::getSubject() const { return impl->subject; }
+bool Address::hasSubject() const { return !(impl->subject.empty()); }
+void Address::setSubject(const std::string& subject) { impl->subject = subject; }
+const Variant::Map& Address::getOptions() const { return impl->options; }
+Variant::Map& Address::getOptions() { return impl->options; }
+void Address::setOptions(const Variant::Map& options) { impl->options = options; }
+
+
+namespace{
+const Variant EMPTY_VARIANT;
+const std::string EMPTY_STRING;
+}
-const std::string TYPE_SEPARATOR(":");
+std::string Address::getType() const
+{
+ const Variant& type = getOption(TYPE);
+ return type.isVoid() ? EMPTY_STRING : type.asString();
+}
+void Address::setType(const std::string& type) { impl->options[TYPE] = type; }
+
+const Variant& Address::getOption(const std::string& key) const
+{
+ Variant::Map::const_iterator i = impl->options.find(key);
+ if (i == impl->options.end()) return EMPTY_VARIANT;
+ else return i->second;
+}
std::ostream& operator<<(std::ostream& out, const Address& address)
{
- if (!address.type.empty()) {
- out << address.type;
- out << TYPE_SEPARATOR;
- }
- out << address.value;
+ out << address.toStr();
return out;
}
+InvalidAddress::InvalidAddress(const std::string& msg) : Exception(msg) {}
+
+MalformedAddress::MalformedAddress(const std::string& msg) : Exception(msg) {}
+
+AddressParser::AddressParser(const std::string& s) : input(s), current(0) {}
+
+bool AddressParser::error(const std::string& message)
+{
+ throw MalformedAddress(message);//TODO: add more debug detail to error message (position etc)
+}
+
+bool AddressParser::parse(Address& address)
+{
+ std::string name;
+ if (readWord(name)) {
+ if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name;
+ address.setName(name);
+ if (readChar('/')) {
+ std::string subject;
+ if (readWord(subject)) {
+ address.setSubject(subject);
+ } else {
+ return error("Expected subject after /");
+ }
+ }
+ Variant options = Variant::Map();
+ if (readMap(options)) {
+ address.setOptions(options.asMap());
+ }
+ return true;
+ } else {
+ return input.empty() || error("Expected name");
+ }
+}
+
+bool AddressParser::readList(Variant& value)
+{
+ if (readChar('[')) {
+ value = Variant::List();
+ Variant item;
+ while (readValue(item)) {
+ value.asList().push_back(item);
+ if (!readChar(',')) break;
+ }
+ return readChar(']') || error("Unmatched '['!");
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readMap(Variant& value)
+{
+ if (readChar('{')) {
+ value = Variant::Map();
+ while (readKeyValuePair(value.asMap()) && readChar(',')) {}
+ return readChar('}') || error("Unmatched '{'!");
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readKeyValuePair(Variant::Map& map)
+{
+ std::string key;
+ Variant value;
+ if (readKey(key)) {
+ if (readChar(':') && readValue(value)) {
+ map[key] = value;
+ return true;
+ } else {
+ return error("Bad key-value pair!");
+ }
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readKey(std::string& key)
+{
+ return readWord(key);
+}
+
+bool AddressParser::readValue(Variant& value)
+{
+ return readSimpleValue(value) || readQuotedString(value) ||
+ readMap(value) || readList(value) || error("Expected value");
+}
+
+bool AddressParser::readString(Variant& value, char delimiter)
+{
+ if (readChar(delimiter)) {
+ std::string::size_type start = current++;
+ while (!eos()) {
+ if (input.at(current) == delimiter) {
+ if (current > start) {
+ value = input.substr(start, current - start);
+ } else {
+ value = "";
+ }
+ ++current;
+ return true;
+ } else {
+ ++current;
+ }
+ }
+ return error("Unmatched delimiter");
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readQuotedString(Variant& value)
+{
+ return readString(value, '"') || readString(value, '\'');
+}
+
+bool AddressParser::readSimpleValue(Variant& value)
+{
+ std::string s;
+ if (readWord(s)) {
+ value = s;
+ try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {}
+ try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {}
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readWord(std::string& value)
+{
+ //skip leading whitespace
+ while (!eos() && iswhitespace()) ++current;
+
+ //read any number of non-whitespace, non-reserved chars into value
+ std::string::size_type start = current;
+ while (!eos() && !iswhitespace() && !isreserved()) ++current;
+
+ if (current > start) {
+ value = input.substr(start, current - start);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readChar(char c)
+{
+ while (!eos()) {
+ if (iswhitespace()) {
+ ++current;
+ } else if (input.at(current) == c) {
+ ++current;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ return false;
+}
+
+bool AddressParser::iswhitespace()
+{
+ return ::isspace(input.at(current));
+}
+
+bool AddressParser::isreserved()
+{
+ return RESERVED.find(input.at(current)) != std::string::npos;
+}
+
+bool AddressParser::eos()
+{
+ return current >= input.size();
+}
+
+const std::string AddressParser::RESERVED = "\'\"{}[],:/";
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp Wed Nov 11 13:15:44 2009
@@ -20,7 +20,6 @@
*/
#include "qpid/messaging/Session.h"
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
@@ -48,30 +47,22 @@
void Session::reject(Message& m) { impl->reject(m); }
void Session::close() { impl->close(); }
-Sender Session::createSender(const Address& address, const VariantMap& options)
+Sender Session::createSender(const Address& address)
{
- return impl->createSender(address, options);
+ return impl->createSender(address);
}
-Receiver Session::createReceiver(const Address& address, const VariantMap& options)
+Receiver Session::createReceiver(const Address& address)
{
- return impl->createReceiver(address, options);
-}
-Receiver Session::createReceiver(const Address& address, const Filter& filter, const VariantMap& options)
-{
- return impl->createReceiver(address, filter, options);
+ return impl->createReceiver(address);
}
-Sender Session::createSender(const std::string& address, const VariantMap& options)
-{
- return impl->createSender(Address(address), options);
-}
-Receiver Session::createReceiver(const std::string& address, const VariantMap& options)
+Sender Session::createSender(const std::string& address)
{
- return impl->createReceiver(Address(address), options);
+ return impl->createSender(Address(address));
}
-Receiver Session::createReceiver(const std::string& address, const Filter& filter, const VariantMap& options)
+Receiver Session::createReceiver(const std::string& address)
{
- return impl->createReceiver(Address(address), filter, options);
+ return impl->createReceiver(Address(address));
}
Address Session::createTempQueue(const std::string& baseName)
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h Wed Nov 11 13:15:44 2009
@@ -23,7 +23,6 @@
*/
#include "qpid/RefCounted.h"
#include <string>
-#include "qpid/messaging/Variant.h"
#include "qpid/sys/Time.h"
namespace qpid {
@@ -53,9 +52,8 @@
virtual Message fetch(qpid::sys::Duration timeout) = 0;
virtual bool dispatch(qpid::sys::Duration timeout) = 0;
virtual Address createTempQueue(const std::string& baseName) = 0;
- virtual Sender createSender(const Address& address, const VariantMap& options) = 0;
- virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0;
- virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0;
+ virtual Sender createSender(const Address& address) = 0;
+ virtual Receiver createReceiver(const Address& address) = 0;
virtual uint32_t available() = 0;
virtual uint32_t pendingAck() = 0;
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp Wed Nov 11 13:15:44 2009
@@ -529,6 +529,7 @@
}
VariantType Variant::getType() const { return impl->getType(); }
+bool Variant::isVoid() const { return impl->getType() == VAR_VOID; }
bool Variant::asBool() const { return impl->asBool(); }
uint8_t Variant::asUint8() const { return impl->asUint8(); }
uint16_t Variant::asUint16() const { return impl->asUint16(); }
Added: qpid/trunk/qpid/cpp/src/tests/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Address.cpp?rev=834869&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Address.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/Address.cpp Wed Nov 11 13:15:44 2009
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Variant.h"
+
+#include "unit_test.h"
+
+using namespace qpid::messaging;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(AddressSuite)
+
+QPID_AUTO_TEST_CASE(testParseNameOnly)
+{
+ Address address("my-topic");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+}
+
+QPID_AUTO_TEST_CASE(testParseSubject)
+{
+ Address address("my-topic/my-subject");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject());
+}
+
+QPID_AUTO_TEST_CASE(testParseOptions)
+{
+ Address address("my-topic {a:bc, x:101, y:'a string'}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString());
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+ BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseSubjectAndOptions)
+{
+ Address address("my-topic/my-subject {a:bc, x:101, y:'a string'}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject());
+ BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString());
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+ BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseNestedOptions)
+{
+ Address address("my-topic {a:{p:202, q:'another string'}, x:101, y:'a string'}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ BOOST_CHECK_EQUAL((uint16_t) 202, address.getOptions()["a"].asMap()["p"].asInt64());
+ BOOST_CHECK_EQUAL(std::string("another string"), address.getOptions()["a"].asMap()["q"].asString());
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+ BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseOptionsWithList)
+{
+ Address address("my-topic {a:[202, 'another string'], x:101}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ Variant::List& list = address.getOptions()["a"].asList();
+ Variant::List::const_iterator i = list.begin();
+ BOOST_CHECK(i != list.end());
+ BOOST_CHECK_EQUAL((uint16_t) 202, i->asInt64());
+ BOOST_CHECK(++i != list.end());
+ BOOST_CHECK_EQUAL(std::string("another string"), i->asString());
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}}
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Nov 11 13:15:44 2009
@@ -114,7 +114,8 @@
ReplicationTest.cpp \
ClientMessageTest.cpp \
PollableCondition.cpp \
- Variant.cpp
+ Variant.cpp \
+ Address.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org