You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/11/11 14:15:46 UTC

svn commit: r834869 [1/2] - in /qpid/trunk/qpid/cpp: examples/messaging/ include/qpid/messaging/ src/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Author: gsim
Date: Wed Nov 11 13:15:44 2009
New Revision: 834869

URL: http://svn.apache.org/viewvc?rev=834869&view=rev
Log:
Added support for address parsing, create/assert/delete policies

Added:
    qpid/trunk/qpid/cpp/src/tests/Address.cpp
Removed:
    qpid/trunk/qpid/cpp/include/qpid/messaging/Filter.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Filter.cpp
Modified:
    qpid/trunk/qpid/cpp/examples/messaging/client.cpp
    qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp
    qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp
    qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
    qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
    qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
    qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/examples/messaging/client.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/client.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/client.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/client.cpp Wed Nov 11 13:15:44 2009
@@ -46,7 +46,7 @@
         Sender sender = session.createSender("service_queue");
 
         //create temp queue & receiver...
-        Address responseQueue = session.createTempQueue();
+        Address responseQueue("#response-queue {create:always, type:queue, node-properties:{x-amqp0-10-auto-delete:true}}");
         Receiver receiver = session.createReceiver(responseQueue);
 
 	// Now send some messages ...

Modified: qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/topic_listener.cpp Wed Nov 11 13:15:44 2009
@@ -20,11 +20,11 @@
  */
 
 #include <qpid/messaging/Connection.h>
-#include <qpid/messaging/Filter.h>
 #include <qpid/messaging/Message.h>
 #include <qpid/messaging/MessageListener.h>
 #include <qpid/messaging/Session.h>
 #include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Variant.h>
 
 #include <cstdlib>
 #include <iostream>
@@ -57,15 +57,14 @@
 }
 
 int main(int argc, char** argv) {
-    const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
-    const char* pattern = argc>2 ? argv[2] : "#.#";
+    const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+    const std::string pattern = argc>2 ? argv[2] : "#.#";
 
     try {
         Connection connection = Connection::open(url);
         Session session = connection.newSession();
 
-        Filter filter(Filter::WILDCARD, pattern, "control");
-        Receiver receiver = session.createReceiver("news_service", filter);
+        Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}");
         Listener listener(receiver);        
         receiver.setListener(&listener);
         receiver.setCapacity(1);
@@ -78,5 +77,3 @@
     }
     return 1;   
 }
-
-

Modified: qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/topic_receiver.cpp Wed Nov 11 13:15:44 2009
@@ -19,32 +19,25 @@
  *
  */
 
-#include <qpid/messaging/Address.h>
 #include <qpid/messaging/Connection.h>
-#include <qpid/messaging/Filter.h>
 #include <qpid/messaging/Message.h>
 #include <qpid/messaging/Receiver.h>
 #include <qpid/messaging/Session.h>
+#include <qpid/messaging/Variant.h>
 
 #include <cstdlib>
 #include <iostream>
 
-#include <sstream>
-
 using namespace qpid::messaging;
 
-using std::stringstream;
-using std::string;
-
 int main(int argc, char** argv) {
-    const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
-    const char* pattern = argc>2 ? argv[2] : "#.#";
+    const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+    const std::string pattern = argc>2 ? argv[2] : "#.#";
 
     try {
         Connection connection = Connection::open(url);
         Session session = connection.newSession();
-        Filter filter(Filter::WILDCARD, pattern, "control");
-        Receiver receiver = session.createReceiver(Address("news_service", "topic"), filter);
+        Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}");
         while (true) {
             Message message = receiver.fetch();
             std::cout << "Message: " << message.getContent() << std::endl;

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Address.h Wed Nov 11 13:15:44 2009
@@ -22,33 +22,71 @@
  *
  */
 #include <string>
+#include "qpid/Exception.h"
+#include "qpid/messaging/Variant.h"
 #include "qpid/client/ClientImportExport.h"
 #include <ostream>
 
 namespace qpid {
-namespace client {
-}
-
 namespace messaging {
 
+struct InvalidAddress : public qpid::Exception 
+{
+    InvalidAddress(const std::string& msg);
+};
+
+struct MalformedAddress : public qpid::Exception 
+{
+    MalformedAddress(const std::string& msg);
+};
+
+class AddressImpl;
+
 /**
  * Represents an address to which messages can be sent and from which
  * messages can be received. Often a simple name is sufficient for
- * this. However this struct allows the type of address to be
- * specified allowing more sophisticated treatment if necessary.
+ * this, however this can be augmented with a subject pattern and
+ * options.
+ * 
+ * All parts of an address can be specified in a string of the
+ * following form:
+ * 
+ * <address> [ / <subject> ] [ { <key> : <value> , ... } ]
+ * 
+ * Here the <address> is a simple name for the addressed entity and
+ * <subject> is a subject or subject pattern for messages sent to or
+ * received from this address. The options are specified as a series
+ * of key value pairs enclosed in curly brackets (denoting a map).
  */
-struct Address
+class Address
 {
-    std::string value;
-    std::string type;
-
+  public:
     QPID_CLIENT_EXTERN Address();
     QPID_CLIENT_EXTERN Address(const std::string& address);
-    QPID_CLIENT_EXTERN Address(const std::string& address, const std::string& type);
-    QPID_CLIENT_EXTERN operator const std::string&() const;
-    QPID_CLIENT_EXTERN const std::string& toStr() const;
+    QPID_CLIENT_EXTERN Address(const std::string& name, const std::string& subject,
+                               const Variant::Map& options, const std::string& type = "");
+    QPID_CLIENT_EXTERN Address(const Address& address);
+    QPID_CLIENT_EXTERN ~Address();
+    Address& operator=(const Address&);
+    QPID_CLIENT_EXTERN const std::string& getName() const;
+    QPID_CLIENT_EXTERN void setName(const std::string&);
+    QPID_CLIENT_EXTERN const std::string& getSubject() const;
+    QPID_CLIENT_EXTERN void setSubject(const std::string&);
+    QPID_CLIENT_EXTERN bool hasSubject() const;
+    QPID_CLIENT_EXTERN const Variant::Map& getOptions() const;
+    QPID_CLIENT_EXTERN Variant::Map& getOptions();
+    QPID_CLIENT_EXTERN void setOptions(const Variant::Map&);
+
+    QPID_CLIENT_EXTERN std::string getType() const;
+    QPID_CLIENT_EXTERN void setType(const std::string&);
+
+    QPID_CLIENT_EXTERN const Variant& getOption(const std::string& key) const;
+
+    QPID_CLIENT_EXTERN std::string toStr() const;
     QPID_CLIENT_EXTERN operator bool() const;
     QPID_CLIENT_EXTERN bool operator !() const;
+  private:
+    AddressImpl* impl;
 };
 
 QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Address& address);

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Message.h Wed Nov 11 13:15:44 2009
@@ -32,7 +32,7 @@
 
 namespace messaging {
 
-struct Address;
+class Address;
 class Codec;
 struct MessageImpl;
 

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Wed Nov 11 13:15:44 2009
@@ -24,7 +24,7 @@
 #include "qpid/client/ClientImportExport.h"
 #include "qpid/client/Handle.h"
 #include "qpid/sys/Time.h"
-#include "Variant.h"
+#include <string>
 
 namespace qpid {
 namespace client {
@@ -35,8 +35,7 @@
 
 namespace messaging {
 
-struct Address;
-struct Filter;
+class Address;
 class Message;
 class MessageListener;
 class Sender;
@@ -90,13 +89,10 @@
     QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
     QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
 
-    QPID_CLIENT_EXTERN Sender createSender(const Address& address, const VariantMap& options = VariantMap());
-    QPID_CLIENT_EXTERN Sender createSender(const std::string& address, const VariantMap& options = VariantMap());
-
-    QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const VariantMap& options = VariantMap());
-    QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options = VariantMap());
-    QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const VariantMap& options = VariantMap());
-    QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap());
+    QPID_CLIENT_EXTERN Sender createSender(const Address& address);
+    QPID_CLIENT_EXTERN Sender createSender(const std::string& address);
+    QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address);
+    QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address);
 
     QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string());
   private:

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h Wed Nov 11 13:15:44 2009
@@ -30,9 +30,6 @@
 #include "qpid/client/ClientImportExport.h"
 
 namespace qpid {
-namespace client {
-}
-
 namespace messaging {
 
 /**
@@ -93,6 +90,7 @@
     QPID_CLIENT_EXTERN ~Variant();
 
     QPID_CLIENT_EXTERN VariantType getType() const;
+    QPID_CLIENT_EXTERN bool isVoid() const;
     
     QPID_CLIENT_EXTERN Variant& operator=(bool);
     QPID_CLIENT_EXTERN Variant& operator=(uint8_t);

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Nov 11 13:15:44 2009
@@ -594,7 +594,6 @@
      qpid/messaging/Address.cpp
      qpid/messaging/Connection.cpp
      qpid/messaging/ConnectionImpl.h
-     qpid/messaging/Filter.cpp
      qpid/messaging/ListContent.cpp
      qpid/messaging/ListView.cpp
      qpid/messaging/MapContent.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Nov 11 13:15:44 2009
@@ -686,7 +686,6 @@
   qpid/client/SubscriptionManagerImpl.h		\
   qpid/messaging/Address.cpp			\
   qpid/messaging/Connection.cpp			\
-  qpid/messaging/Filter.cpp			\
   qpid/messaging/ListContent.cpp		\
   qpid/messaging/ListView.cpp			\
   qpid/messaging/MapContent.cpp			\
@@ -795,7 +794,6 @@
   ../include/qpid/messaging/Address.h 		\
   ../include/qpid/messaging/Connection.h 	\
   ../include/qpid/messaging/Codec.h 	        \
-  ../include/qpid/messaging/Filter.h 		\
   ../include/qpid/messaging/ListContent.h 	\
   ../include/qpid/messaging/ListView.h 		\
   ../include/qpid/messaging/MapContent.h 	\

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Wed Nov 11 13:15:44 2009
@@ -20,18 +20,24 @@
  */
 #include "qpid/client/amqp0_10/AddressResolution.h"
 #include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
 #include "qpid/client/amqp0_10/MessageSource.h"
 #include "qpid/client/amqp0_10/MessageSink.h"
 #include "qpid/client/amqp0_10/OutgoingMessage.h"
 #include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
 #include "qpid/messaging/Message.h"
+#include "qpid/messaging/Variant.h"
 #include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/enum.h"
+#include "qpid/framing/ExchangeQueryResult.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/QueueQueryResult.h"
 #include "qpid/framing/ReplyTo.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
 
 namespace qpid {
 namespace client {
@@ -40,61 +46,145 @@
 using qpid::Exception;
 using qpid::messaging::Address;
 using qpid::messaging::Filter;
+using qpid::messaging::InvalidAddress;
 using qpid::messaging::Variant;
+using qpid::framing::ExchangeQueryResult;
 using qpid::framing::FieldTable;
+using qpid::framing::QueueQueryResult;
 using qpid::framing::ReplyTo;
+using qpid::framing::Uuid;
 using namespace qpid::framing::message;
+using namespace boost::assign;
 
 
 namespace{
-const Variant EMPTY_VARIANT;
 const FieldTable EMPTY_FIELD_TABLE;
 const std::string EMPTY_STRING;
 
 //option names
 const std::string BROWSE("browse");
 const std::string EXCLUSIVE("exclusive");
-const std::string MODE("mode");
-const std::string NAME("name");
-const std::string UNACKNOWLEDGED("unacknowledged");
+const std::string NO_LOCAL("no-local");
+const std::string FILTER("filter");
+const std::string RELIABILITY("reliability");
+const std::string NAME("subscription-name");
+const std::string NODE_PROPERTIES("node-properties");
+
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
 
 const std::string QUEUE_ADDRESS("queue");
 const std::string TOPIC_ADDRESS("topic");
-const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+");
-const std::string DIVIDER("/");
 
-const std::string SIMPLE_SUBSCRIPTION("simple");
-const std::string RELIABLE_SUBSCRIPTION("reliable");
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
 const std::string DURABLE_SUBSCRIPTION("durable");
+const std::string DURABLE("durable");
+
+const std::string TOPIC_EXCHANGE("topic");
+const std::string FANOUT_EXCHANGE("fanout");
+const std::string DIRECT_EXCHANGE("direct");
+const std::string HEADERS_EXCHANGE("headers");
+const std::string XML_EXCHANGE("xml");
+const std::string WILDCARD_ANY("*");
+}
+
+//some amqp 0-10 specific options
+namespace xamqp{
+const std::string AUTO_DELETE("x-amqp0-10-auto-delete");
+const std::string EXCHANGE_TYPE("x-amqp0-10-exchange-type");
+const std::string EXCLUSIVE("x-amqp0-10-exclusive");
+const std::string ALTERNATE_EXCHANGE("x-amqp0-10-alternate-exchange");
+const std::string ARGUMENTS("x-amqp0-10-arguments");
+const std::string QUEUE_ARGUMENTS("x-amqp0-10-queue-arguments");
+const std::string SUBSCRIBE_ARGUMENTS("x-amqp0-10-queue-arguments");
 }
 
-class QueueSource : public MessageSource
+class Node
+{
+  protected:
+    enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+
+    Node(const Address& address);
+
+    const std::string name;
+    Variant createPolicy;
+    Variant assertPolicy;
+    Variant deletePolicy;
+
+    static bool enabled(const Variant& policy, CheckMode mode);
+    static bool createEnabled(const Address& address, CheckMode mode);
+    static void convert(const Variant& option, FieldTable& arguments);
+    static std::vector<std::string> RECEIVER_MODES;
+    static std::vector<std::string> SENDER_MODES;
+};
+
+class Queue : protected Node
 {
   public:
-    QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED, 
-                bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE);
+    Queue(const Address& address);
+  protected:
+    void checkCreate(qpid::client::AsyncSession&, CheckMode);
+    void checkAssert(qpid::client::AsyncSession&, CheckMode);
+    void checkDelete(qpid::client::AsyncSession&, CheckMode);
+  private:
+    bool durable;
+    bool autoDelete;
+    bool exclusive;
+    std::string alternateExchange;
+    FieldTable arguments;    
+
+    void configure(const Address&);
+};
+
+class Exchange : protected Node
+{
+  public:
+    Exchange(const Address& address);
+  protected:
+    void checkCreate(qpid::client::AsyncSession&, CheckMode);
+    void checkAssert(qpid::client::AsyncSession&, CheckMode);
+    void checkDelete(qpid::client::AsyncSession&, CheckMode);
+    const std::string& getDesiredExchangeType() { return type; }
+
+  private:
+    std::string type;
+    bool durable;
+    bool autoDelete;
+    std::string alternateExchange;
+    FieldTable arguments;    
+
+    void configure(const Address&);
+};
+
+class QueueSource : public Queue, public MessageSource
+{
+  public:
+    QueueSource(const Address& address);
     void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
     void cancel(qpid::client::AsyncSession& session, const std::string& destination);
   private:
-    const std::string name;
     const AcceptMode acceptMode;
     const AcquireMode acquireMode;
     const bool exclusive;
-    const FieldTable options;
+    FieldTable options;
 };
 
-class Subscription : public MessageSource
+class Subscription : public Exchange, public MessageSource
 {
   public:
-    enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE};
-
-    Subscription(const std::string& name, SubscriptionMode mode = SIMPLE,
-                 const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE);
-    void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+    Subscription(const Address&, const std::string& exchangeType);
     void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
     void cancel(qpid::client::AsyncSession& session, const std::string& destination);
-
-    static SubscriptionMode getMode(const std::string& mode);
   private:
     struct Binding
     {
@@ -107,155 +197,138 @@
 
     typedef std::vector<Binding> Bindings;
 
-    const std::string name;
-    const bool autoDelete;
+    const std::string queue;
+    const bool reliable;
     const bool durable;
-    const FieldTable queueOptions;
-    const FieldTable subscriptionOptions;
+    FieldTable queueOptions;
+    FieldTable subscriptionOptions;
     Bindings bindings;
-    std::string queue;
+    
+    void bindSpecial(const std::string& exchangeType);
+    void bind(const Variant& filter);
+    void bind(const Variant::Map& filter);
+    void bind(const Variant::List& filter);
+    void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+    static std::string getSubscriptionName(const std::string& base, const Variant& name);
 };
 
-class Exchange : public MessageSink
+class ExchangeSink : public Exchange, public MessageSink
 {
   public:
-    Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING, 
-             bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, 
-             const FieldTable& options = EMPTY_FIELD_TABLE);
+    ExchangeSink(const Address& name);
     void declare(qpid::client::AsyncSession& session, const std::string& name);
     void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
     void cancel(qpid::client::AsyncSession& session, const std::string& name);
   private:
-    const std::string name;
     const std::string defaultSubject;
-    const bool passive;
-    const std::string type;
-    const bool durable;
-    const FieldTable options;
 };
 
-class QueueSink : public MessageSink
+class QueueSink : public Queue, public MessageSink
 {
   public:
-    QueueSink(const std::string& name, bool passive=true, bool exclusive=false, 
-              bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
+    QueueSink(const Address& name);
     void declare(qpid::client::AsyncSession& session, const std::string& name);
     void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
     void cancel(qpid::client::AsyncSession& session, const std::string& name);
   private:
-    const std::string name;
-    const bool passive;
-    const bool exclusive;
-    const bool autoDelete;
-    const bool durable;
-    const FieldTable options;
 };
 
 
 bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
-bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address);
+
+bool in(const Variant& value, const std::vector<std::string>& choices)
+{
+    if (!value.isVoid()) {
+        for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+            if (value.asString() == *i) return true;
+        }
+    }
+    return false;
+}
+
+bool getReceiverPolicy(const Address& address, const std::string& key)
+{
+    return in(address.getOption(key), list_of<std::string>(ALWAYS)(RECEIVER));
+}
+
+bool getSenderPolicy(const Address& address, const std::string& key)
+{
+    return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
+}
 
-const Variant& getOption(const std::string& key, const Variant::Map& options)
+bool is_unreliable(const Address& address)
 {
-    Variant::Map::const_iterator i = options.find(key);
-    if (i == options.end()) return EMPTY_VARIANT;
-    else return i->second;
+    return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+}
+
+bool is_reliable(const Address& address)
+{
+    return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
 }
 
 std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
-                                                              const Address& address, 
-                                                              const Filter* filter,
-                                                              const Variant::Map& options)
+                                                              const Address& address)
 {
     //TODO: handle case where there exists a queue and an exchange of
     //the same name (hence an unqualified address is ambiguous)
     
     //TODO: make sure specified address type gives sane error message
-    //if it does npt match the configuration on server
+    //if it does not match the configuration on server
 
-    if (isQueue(session, address)) {
-        //TODO: support auto-created queue as source, if requested by specific option
-
-        AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
-        AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED;
-        bool exclusive = getOption(EXCLUSIVE, options).asBool();
-        FieldTable arguments;
-        //TODO: extract subscribe arguments from options (e.g. either
-        //filter out already processed keys and send the rest, or have
-        //a nested map)
+    /**
+    if (Node::createEnabled(address, FOR_RECEIVER)) {
+    } else {
+    }
+    **/
 
-        std::auto_ptr<MessageSource> source = 
-            std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments));
+    if (isQueue(session, address)) {
+        std::auto_ptr<MessageSource> source(new QueueSource(address));
+        QPID_LOG(debug, "resolved source address as queue: " << address);
         return source;
     } else {
-        //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important)
-        std::auto_ptr<Subscription> bindings = 
-            std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(), 
-                                                         Subscription::getMode(getOption(MODE, options).asString())));
-
-        qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value);
-        if (result.getNotFound()) {
-            throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
-        } else if (result.getType() == "topic") {
-            if (filter) {
-                if (filter->type != Filter::WILDCARD) {
-                    throw qpid::framing::NotImplementedException(
-                        QPID_MSG("Filters of type " << filter->type << " not supported by address " << address));
-                    
-                }
-                for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) {
-                    bindings->add(address.value, *i, qpid::framing::FieldTable());
-                }
-            } else {
-                //default is to receive all messages
-                bindings->add(address.value, "*", qpid::framing::FieldTable());
-            }
-        } else if (result.getType() == "fanout") {
-            if (filter) {
-                throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address));                
-            }            
-            bindings->add(address.value, address.value, qpid::framing::FieldTable());
-        } else if (result.getType() == "direct") {
-            //TODO: ????
-        } else {
-            //TODO: xml and headers exchanges
-            throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address));                
-        }
-        std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release());
+        qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
+        std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
+        QPID_LOG(debug, "resolved source address as topic: " << address);
         return source;
     }
 }
 
 
 std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
-                                                          const qpid::messaging::Address& address, 
-                                                          const qpid::messaging::Variant::Map& /*options*/)
+                                                          const qpid::messaging::Address& address)
 {
     std::auto_ptr<MessageSink> sink;
     if (isQueue(session, address)) {
-        //TODO: support for auto-created queues as sink
-        sink = std::auto_ptr<MessageSink>(new QueueSink(address.value));
+        sink = std::auto_ptr<MessageSink>(new QueueSink(address));
     } else {
-        std::string subject;
-        if (isTopic(session, address, subject)) {
-            //TODO: support for auto-created exchanges as sink
-            sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject));
+        if (isTopic(session, address)) {
+            sink = std::auto_ptr<MessageSink>(new ExchangeSink(address));
         } else {
-            if (address.type.empty()) {
-                throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+            if (address.getType().empty()) {
+                throw InvalidAddress(QPID_MSG("Address not known: " << address));
             } else {
-                throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type));
+                throw InvalidAddress(QPID_MSG("Address type not recognised: " << address.getType()));
             }
         }
     }
     return sink;
 }
 
-QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) :
-    name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {}
+QueueSource::QueueSource(const Address& address) :
+    Queue(address),
+    acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+    acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
+    exclusive(address.getOption(EXCLUSIVE).asBool())
+{
+    //extract subscription arguments from address options
+    convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options);
+}
 
 void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
 {
+    checkCreate(session, FOR_RECEIVER);
+    checkAssert(session, FOR_RECEIVER);
     session.messageSubscribe(arg::queue=name, 
                              arg::destination=destination,
                              arg::acceptMode=acceptMode,
@@ -267,11 +340,48 @@
 void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
 {
     session.messageCancel(destination);
+    checkDelete(session, FOR_RECEIVER);
+}
+
+std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name)
+{
+    if (name.isVoid()) {
+        return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
+    } else {
+        return (boost::format("%1%_%2%") % base % name.asString()).str();
+    }
 }
 
-Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions)
-    : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE), 
-      queueOptions(qOptions), subscriptionOptions(sOptions) {}
+Subscription::Subscription(const Address& address, const std::string& exchangeType)
+    : Exchange(address),
+      queue(getSubscriptionName(name, address.getOption(NAME))),
+      reliable(is_reliable(address)),
+      durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
+{
+    if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
+    convert(address.getOption(xamqp::QUEUE_ARGUMENTS), queueOptions);
+    convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), subscriptionOptions);
+
+    const Variant& filter = address.getOption(FILTER);
+    if (!filter.isVoid()) {
+        //TODO: if both subject _and_ filter are specified,
+        //combine in some way; for now we just ignore the
+        //subject in that case.
+        bind(filter);
+    } else if (address.hasSubject()) {
+        //Note: This will not work for headers- or xml- exchange;
+        //fanout exchange will do no filtering.
+        //TODO: for headers- or xml- exchange can construct a match
+        //for the subject in the application-headers
+        bind(address.getSubject());
+    } else {
+        //Neither a subject nor a filter has been defined, treat this
+        //as wanting to match all messages (Note: direct exchange is
+        //currently unable to support this case).
+        if (!exchangeType.empty()) bindSpecial(exchangeType);
+        else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType());
+    }
+}
 
 void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
 {
@@ -280,18 +390,19 @@
 
 void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
 {
-    if (name.empty()) {
-        //TODO: use same scheme as JMS client for subscription queue name generation?
-        queue = session.getId().getName() + destination;
-    } else {
-        queue = name;
-    }
+    //create exchange if required and specified by policy:
+    checkCreate(session, FOR_RECEIVER);
+    checkAssert(session, FOR_RECEIVER);
+
+    //create subscription queue:
     session.queueDeclare(arg::queue=queue, arg::exclusive=true, 
-                         arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions);
+                         arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
+    //bind subscription queue to exchange:
     for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
         session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
     }
-    AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+    //subscribe to subscription queue:
+    AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
     session.messageSubscribe(arg::queue=queue, arg::destination=destination, 
                              arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
 }
@@ -300,36 +411,23 @@
 {
     session.messageCancel(destination);
     session.queueDelete(arg::queue=queue);
+    checkDelete(session, FOR_RECEIVER);
 }
 
 Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
     exchange(e), key(k), options(o) {}
 
-Subscription::SubscriptionMode Subscription::getMode(const std::string& s)
-{
-    if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE;
-    else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE;
-    else if (s == DURABLE_SUBSCRIPTION) return DURABLE;
-    else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s)); 
-}
-
 void convert(qpid::messaging::Message& from, qpid::client::Message& to);
 
-Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject, 
-         bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) : 
-    name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {}
-
-void Exchange::declare(qpid::client::AsyncSession& session, const std::string&)
-{
-    //TODO: should this really by synchronous? want to get error if not valid...
-    if (passive) {
-        sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
-    } else {
-        sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options);
-    }
+ExchangeSink::ExchangeSink(const Address& address) : Exchange(address), defaultSubject(address.getSubject()) {}
+
+void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+    checkCreate(session, FOR_SENDER);
+    checkAssert(session, FOR_SENDER);
 }
 
-void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
+void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
 {
     if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
         m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
@@ -337,22 +435,17 @@
     m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
 }
 
-void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
+void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+    checkDelete(session, FOR_SENDER);    
+}
 
-QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive, 
-                     bool _autoDelete, bool _durable, const FieldTable& _options) :
-    name(_name), passive(_passive), exclusive(_exclusive), 
-    autoDelete(_autoDelete), durable(_durable), options(_options) {}
+QueueSink::QueueSink(const Address& address) : Queue(address) {}
 
 void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
 {
-    //TODO: should this really by synchronous?
-    if (passive) {
-        sync(session).queueDeclare(arg::queue=name, arg::passive=true);
-    } else {
-        sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable, 
-                                   arg::autoDelete=autoDelete, arg::arguments=options);
-    }
+    checkCreate(session, FOR_SENDER);
+    checkAssert(session, FOR_SENDER);
 }
 void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
 {
@@ -360,9 +453,10 @@
     m.status = session.messageTransfer(arg::content=m.message);
 }
 
-void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
-
-void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
+void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+    checkDelete(session, FOR_SENDER);
+}
 
 void convert(qpid::messaging::Message& from, qpid::client::Message& to)
 {
@@ -372,7 +466,7 @@
     //TODO: set other delivery properties
     to.getMessageProperties().setContentType(from.getContentType());
     const Address& address = from.getReplyTo();
-    if (!address.value.empty()) {
+    if (!address.getName().empty()) {
         to.getMessageProperties().setReplyTo(AddressResolution::convert(address));
     }
     translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders());
@@ -381,72 +475,292 @@
 
 Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
 {
-    if (rt.getExchange().empty()) {
-        if (rt.getRoutingKey().empty()) {
-            return Address();//empty address
-        } else {
-            return Address(rt.getRoutingKey(), QUEUE_ADDRESS);
-        }
+    Address address;
+    if (rt.getExchange().empty()) {//if default exchange, treat as queue
+        address.setName(rt.getRoutingKey());
+        address.setType(QUEUE_ADDRESS);
     } else {
-        if (rt.getRoutingKey().empty()) {
-            return Address(rt.getExchange(), TOPIC_ADDRESS);
-        } else {
-            return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT);
-        }
-    }    
+        address.setName(rt.getExchange());
+        address.setSubject(rt.getRoutingKey());
+        address.setType(TOPIC_ADDRESS);
+    }
+    return address;
 }
 
 qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
 {
-    if (address.type == QUEUE_ADDRESS || address.type.empty()) {
-        return ReplyTo(EMPTY_STRING, address.value);
-    } else if (address.type == TOPIC_ADDRESS) {
-        return ReplyTo(address.value, EMPTY_STRING);
-    } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
-        //need to split the value
-        string::size_type i = address.value.find(DIVIDER);
-        if (i != string::npos) {
-            std::string exchange = address.value.substr(0, i);
-            std::string routingKey;
-            if (i+1 < address.value.size()) {
-                routingKey = address.value.substr(i+1);
-            } 
-            return ReplyTo(exchange, routingKey);
-        } else {
-            return ReplyTo(address.value, EMPTY_STRING);
-        }
+    if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
+        return ReplyTo(EMPTY_STRING, address.getName());
+    } else if (address.getType() == TOPIC_ADDRESS) {
+        return ReplyTo(address.getName(), address.getSubject());
     } else {
-        QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type);
-        //treat as queue
-        return ReplyTo(EMPTY_STRING, address.value);
+        QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType());
+        return ReplyTo(EMPTY_STRING, address.getName());//treat as queue
     }
 }
 
 bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) 
 {
-    return address.type == QUEUE_ADDRESS || 
-        (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value);
+    return address.getType() == QUEUE_ADDRESS || 
+        (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
 }
 
-bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject)
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address)
 {
-    if (address.type.empty()) {
-        return !session.exchangeQuery(address.value).getNotFound();
-    } else if (address.type == TOPIC_ADDRESS) {
-        return true;
-    } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
-        string::size_type i = address.value.find(DIVIDER);
-        if (i != string::npos) {
-            std::string exchange = address.value.substr(0, i);
-            if (i+1 < address.value.size()) {
-                subject = address.value.substr(i+1);
-            } 
-        }
+    if (address.getType().empty()) {
+        return !session.exchangeQuery(address.getName()).getNotFound();
+    } else if (address.getType() == TOPIC_ADDRESS) {
         return true;
     } else {
         return false;
     }
 }
 
+void Subscription::bind(const Variant& filter)
+{
+    switch (filter.getType()) {
+      case qpid::messaging::VAR_MAP:
+        bind(filter.asMap());
+        break;
+      case qpid::messaging::VAR_LIST:
+        bind(filter.asList());
+        break;
+      default:
+        add(name, filter.asString());
+        break;
+    }
+}
+
+void Subscription::bind(const Variant::Map& filter)
+{
+    qpid::framing::FieldTable arguments;
+    translate(filter, arguments);
+    add(name, queue, arguments);
+}
+
+void Subscription::bind(const Variant::List& filter)
+{
+    for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) {
+        bind(*i);
+    }
+}
+
+void Subscription::bindSpecial(const std::string& exchangeType)
+{
+    if (exchangeType == TOPIC_EXCHANGE) {
+        add(name, WILDCARD_ANY);        
+    } else if (exchangeType == FANOUT_EXCHANGE) {
+        add(name, queue);        
+    } else if (exchangeType == HEADERS_EXCHANGE) {
+        //TODO: add special binding for headers exchange to match all messages
+    } else if (exchangeType == XML_EXCHANGE) {
+        //TODO: add special binding for xml exchange to match all messages
+    } else { //E.g. direct
+        throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType));
+    }
+}
+
+Node::Node(const Address& address) : name(address.getName()),
+                                     createPolicy(address.getOption(CREATE)),
+                                     assertPolicy(address.getOption(ASSERT)),
+                                     deletePolicy(address.getOption(DELETE)) {}
+
+Queue::Queue(const Address& a) : Node(a),
+                                 durable(false),
+                                 autoDelete(false),
+                                 exclusive(false)
+{
+    configure(a);
+}
+
+void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+    if (enabled(createPolicy, mode)) {
+        QPID_LOG(debug, "Auto-creating queue '" << name << "'");
+        try {            
+            sync(session).queueDeclare(arg::queue=name,
+                                       arg::durable=durable,
+                                       arg::autoDelete=autoDelete,
+                                       arg::exclusive=exclusive,
+                                       arg::alternateExchange=alternateExchange,
+                                       arg::arguments=arguments);
+        } catch (const qpid::Exception& e) {
+            throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
+        }
+    } else {
+        try {
+            sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+        } catch (const qpid::Exception& e) {
+            throw InvalidAddress((boost::format("Queue %1% does not exist; %2%") % name % e.what()).str());
+        }
+    }
+}
+
+void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+    if (enabled(deletePolicy, mode)) {
+        QPID_LOG(debug, "Auto-deleting queue '" << name << "'");
+        sync(session).queueDelete(arg::queue=name);
+    }
+}
+
+void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+    if (enabled(assertPolicy, mode)) {
+        QueueQueryResult result = sync(session).queueQuery(name);
+        if (result.getQueue() != name) {
+            throw InvalidAddress((boost::format("Queue not found: %1%") % name).str());
+        } else {
+            if (durable && !result.getDurable()) {
+                throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str());
+            }
+            if (autoDelete && !result.getAutoDelete()) {
+                throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str());
+            }
+            if (exclusive && !result.getExclusive()) {
+                throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str());
+            }
+            if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
+                throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") 
+                                      % name % alternateExchange % result.getAlternateExchange()).str());
+            }
+            for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+                FieldTable::ValuePtr v = result.getArguments().get(i->first);
+                if (!v) {
+                    throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+                } else if (*i->second != *v) {
+                    throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+                                          % i->first % name % *(i->second) % *v).str());
+                }
+            }
+        }
+    }
+}
+
+void Queue::configure(const Address& address)
+{
+    const Variant& properties = address.getOption(NODE_PROPERTIES);
+    if (!properties.isVoid()) {
+        Variant::Map p = properties.asMap();
+        durable = p[DURABLE];
+        autoDelete = p[xamqp::AUTO_DELETE];
+        exclusive = p[xamqp::EXCLUSIVE];
+        alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString();
+        if (!p[xamqp::ARGUMENTS].isVoid()) {
+            translate(p[xamqp::ARGUMENTS].asMap(), arguments);
+        }
+    }
+}
+
+Exchange::Exchange(const Address& a) : Node(a),
+                                       durable(false),
+                                       autoDelete(false)
+{
+    configure(a);
+}
+
+void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+    if (enabled(createPolicy, mode)) {
+        try {
+            sync(session).exchangeDeclare(arg::exchange=name,
+                                      arg::type=type,
+                                      arg::durable=durable,
+                                      arg::autoDelete=autoDelete,
+                                      arg::alternateExchange=alternateExchange,
+                                      arg::arguments=arguments);
+        } catch (const qpid::Exception& e) {
+            throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
+        }
+    } else {
+        try {
+            sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+        } catch (const qpid::Exception& e) {
+            throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str());
+        }
+    }
+}
+
+void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+    if (enabled(deletePolicy, mode)) {
+        sync(session).exchangeDelete(arg::exchange=name);
+    }
+}
+
+void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+    if (enabled(assertPolicy, mode)) {
+        ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name);
+        if (result.getNotFound()) {
+            throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
+        } else {
+            if (!type.empty() && result.getType() != type) {
+                throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") 
+                                      % name % type % result.getType()).str());
+            }
+            if (durable && !result.getDurable()) {
+                throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
+            }
+            //Note: Can't check auto-delete or alternate-exchange via
+            //exchange-query-result as these are not returned
+            //TODO: could use a passive declare to check alternate-exchange
+            for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+                FieldTable::ValuePtr v = result.getArguments().get(i->first);
+                if (!v) {
+                    throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+                } else if (i->second != v) {
+                    throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+                                          % i->first % name % *(i->second) % *v).str());
+                }
+            }
+        }
+    }
+}
+
+void Exchange::configure(const Address& address)
+{
+    const Variant& properties = address.getOption(NODE_PROPERTIES);
+    if (!properties.isVoid()) {
+        Variant::Map p = properties.asMap();
+        durable = p[DURABLE];
+        autoDelete = p[xamqp::AUTO_DELETE];
+        type = p[xamqp::EXCHANGE_TYPE].asString();
+        alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString();
+        if (!p[xamqp::ARGUMENTS].isVoid()) {
+            translate(p[xamqp::ARGUMENTS].asMap(), arguments);
+        }
+    }
+}
+
+
+bool Node::enabled(const Variant& policy, CheckMode mode)
+{
+    bool result;
+    switch (mode) {
+      case FOR_RECEIVER:
+        result = in(policy, RECEIVER_MODES);
+        break;
+      case FOR_SENDER:
+        result = in(policy, SENDER_MODES);
+        break;
+    }
+    return result;
+}
+
+bool Node::createEnabled(const Address& address, CheckMode mode)
+{
+    const Variant& policy = address.getOption(CREATE);
+    return enabled(policy, mode);
+}
+
+void Node::convert(const Variant& options, FieldTable& arguments)
+{
+    if (!options.isVoid()) {
+        translate(options.asMap(), arguments);
+    }    
+}
+std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
+std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
 
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h Wed Nov 11 13:15:44 2009
@@ -50,13 +50,10 @@
 {
   public:
     std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session,
-                                               const qpid::messaging::Address& address, 
-                                               const qpid::messaging::Filter* filter,
-                                               const qpid::messaging::Variant::Map& options);
-
+                                               const qpid::messaging::Address& address);
+    
     std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session,
-                                           const qpid::messaging::Address& address, 
-                                           const qpid::messaging::Variant::Map& options);
+                                           const qpid::messaging::Address& address);
 
     static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
     static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Wed Nov 11 13:15:44 2009
@@ -50,11 +50,11 @@
 
     qpid::sys::Mutex lock;//used to protect data structures
     qpid::sys::Semaphore semaphore;//used to coordinate reconnection
+    Sessions sessions;
     qpid::client::Connection connection;
     std::auto_ptr<FailoverListener> failoverListener;
     qpid::Url url;
     qpid::client::ConnectionSettings settings;
-    Sessions sessions;
     bool reconnectionEnabled;
     int timeout;
     int minRetryInterval;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Wed Nov 11 13:15:44 2009
@@ -39,7 +39,7 @@
     message.setData(from.getContent());
     message.getMessageProperties().setContentType(from.getContentType());
     const Address& address = from.getReplyTo();
-    if (!address.value.empty()) {
+    if (address) {
         message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
     }
     translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Wed Nov 11 13:15:44 2009
@@ -103,7 +103,7 @@
 
     session = s;
     if (state == UNRESOLVED) {
-        source = resolver.resolveSource(session, address, filter, options);
+        source = resolver.resolveSource(session, address);
         state = STOPPED;//TODO: if session is started, go straight to started
     }
     if (state == CANCELLED) {
@@ -136,11 +136,9 @@
 }
 
 ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, 
-                           const qpid::messaging::Address& a,
-                           const qpid::messaging::Filter* f, 
-                           const qpid::messaging::Variant::Map& o) : 
+                           const qpid::messaging::Address& a) : 
 
-    parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF), 
+    parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF), 
     state(UNRESOLVED), capacity(0), listener(0), window(0) {}
 
 bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Wed Nov 11 13:15:44 2009
@@ -22,7 +22,6 @@
  *
  */
 #include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/ReceiverImpl.h"
 #include "qpid/messaging/Variant.h"
@@ -48,9 +47,7 @@
     enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
 
     ReceiverImpl(SessionImpl& parent, const std::string& name,
-                 const qpid::messaging::Address& address,
-                 const qpid::messaging::Filter* filter,
-                 const qpid::messaging::Variant::Map& options);
+                 const qpid::messaging::Address& address);
 
     void init(qpid::client::AsyncSession session, AddressResolution& resolver);
     bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
@@ -72,8 +69,6 @@
     SessionImpl& parent;
     const std::string destination;
     const qpid::messaging::Address address;
-    const qpid::messaging::Filter* filter;
-    const qpid::messaging::Variant::Map options;
     const uint32_t byteCredit;
     State state;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Wed Nov 11 13:15:44 2009
@@ -29,9 +29,8 @@
 namespace amqp0_10 {
 
 SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, 
-                       const qpid::messaging::Address& _address, 
-                       const qpid::messaging::Variant::Map& _options) : 
-    parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
+                       const qpid::messaging::Address& _address) : 
+    parent(_parent), name(_name), address(_address), state(UNRESOLVED),
     capacity(50), window(0), flushed(false) {}
 
 void SenderImpl::send(const qpid::messaging::Message& message) 
@@ -63,7 +62,7 @@
 {
     session = s;
     if (state == UNRESOLVED) {
-        sink = resolver.resolveSink(session, address, options);
+        sink = resolver.resolveSink(session, address);
         state = ACTIVE;
     }
     if (state == CANCELLED) {

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Wed Nov 11 13:15:44 2009
@@ -47,8 +47,7 @@
     enum State {UNRESOLVED, ACTIVE, CANCELLED};
 
     SenderImpl(SessionImpl& parent, const std::string& name, 
-               const qpid::messaging::Address& address, 
-               const qpid::messaging::Variant::Map& options);
+               const qpid::messaging::Address& address);
     void send(const qpid::messaging::Message&);
     void cancel();
     void setCapacity(uint32_t);
@@ -60,7 +59,6 @@
     SessionImpl& parent;
     const std::string name;
     const qpid::messaging::Address address;
-    const qpid::messaging::Variant::Map options;
     State state;
     std::auto_ptr<MessageSink> sink;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Wed Nov 11 13:15:44 2009
@@ -28,7 +28,6 @@
 #include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
 #include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/MessageImpl.h"
 #include "qpid/messaging/MessageListener.h"
@@ -132,36 +131,22 @@
 {
     qpid::messaging::Receiver result;
     const qpid::messaging::Address& address;
-    const Filter* filter;
-    const qpid::messaging::Variant::Map& options;
     
-    CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a, const Filter* f, 
-                   const qpid::messaging::Variant::Map& o) :
-        Command(i), address(a), filter(f), options(o) {}
-    void operator()() { result = impl.createReceiverImpl(address, filter, options); }
+    CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
+        Command(i), address(a) {}
+    void operator()() { result = impl.createReceiverImpl(address); }
 };
 
-Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options)
-{ 
-    CreateReceiver f(*this, address, 0, options);
-    while (!execute(f)) {}
-    return f.result;
-}
-
-Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, 
-                                     const Filter& filter, const VariantMap& options)
-{ 
-    CreateReceiver f(*this, address, &filter, options);
-    while (!execute(f)) {}
-    return f.result;
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address)
+{
+    return get1<CreateReceiver, Receiver>(address);
 }
 
-Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address,
-                                         const Filter* filter, const VariantMap& options)
+Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address)
 {
-    std::string name = address;
+    std::string name = address.getName();
     getFreeKey(name, receivers);
-    Receiver receiver(new ReceiverImpl(*this, name, address, filter, options));
+    Receiver receiver(new ReceiverImpl(*this, name, address));
     getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
     receivers[name] = receiver;
     return receiver;
@@ -171,26 +156,22 @@
 {
     qpid::messaging::Sender result;
     const qpid::messaging::Address& address;
-    const qpid::messaging::Variant::Map& options;
     
-    CreateSender(SessionImpl& i, const qpid::messaging::Address& a,
-                 const qpid::messaging::Variant::Map& o) :
-        Command(i), address(a), options(o) {}
-    void operator()() { result = impl.createSenderImpl(address, options); }
+    CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
+        Command(i), address(a) {}
+    void operator()() { result = impl.createSenderImpl(address); }
 };
 
-Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options)
+Sender SessionImpl::createSender(const qpid::messaging::Address& address)
 {
-    CreateSender f(*this, address, options);
-    while (!execute(f)) {}
-    return f.result;
+    return get1<CreateSender, Sender>(address);
 }
 
-Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address, const VariantMap& options)
+Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
 { 
-    std::string name = address;
+    std::string name = address.getName();
     getFreeKey(name, senders);
-    Sender sender(new SenderImpl(*this, name, address, options));
+    Sender sender(new SenderImpl(*this, name, address));
     getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver);
     senders[name] = sender;
     return sender;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Wed Nov 11 13:15:44 2009
@@ -63,13 +63,8 @@
     void sync();
     void flush();
     qpid::messaging::Address createTempQueue(const std::string& baseName);
-    qpid::messaging::Sender createSender(const qpid::messaging::Address& address,
-                                         const qpid::messaging::VariantMap& options);
-    qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address,
-                                             const qpid::messaging::VariantMap& options);
-    qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address, 
-                                             const qpid::messaging::Filter& filter,
-                                             const qpid::messaging::VariantMap& options);
+    qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
+    qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address);
 
     void* getLastConfirmedSent();
     void* getLastConfirmedAcknowledged();
@@ -129,11 +124,8 @@
     void closeImpl();
     void syncImpl();
     void flushImpl();
-    qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address, 
-                                             const qpid::messaging::VariantMap& options);
-    qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address, 
-                                                 const qpid::messaging::Filter* filter, 
-                                                 const qpid::messaging::VariantMap& options);
+    qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address);
+    qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address);
     uint32_t availableImpl(const std::string* destination);
     uint32_t pendingAckImpl(const std::string* destination);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp Wed Nov 11 13:15:44 2009
@@ -19,28 +19,293 @@
  *
  */
 #include "qpid/messaging/Address.h"
+#include "qpid/framing/Uuid.h"
+#include <sstream>
+#include <boost/format.hpp>
 
 namespace qpid {
 namespace messaging {
 
-Address::Address() {}
-Address::Address(const std::string& address) : value(address) {}
-Address::Address(const std::string& address, const std::string& t) : value(address), type(t) {}
-Address::operator const std::string&() const { return value; }
-const std::string& Address::toStr() const { return value; }
-Address::operator bool() const { return !value.empty(); }
-bool Address::operator !() const { return value.empty(); }
+namespace {
+const std::string SUBJECT_DIVIDER = "/";
+const std::string SPACE = " ";
+const std::string TYPE = "type";
+}
+class AddressImpl
+{
+  public:
+    std::string name;
+    std::string subject;
+    Variant::Map options;
+ 
+    AddressImpl() {}
+    AddressImpl(const std::string& n, const std::string& s, const Variant::Map& o) :
+        name(n), subject(s), options(o) {}
+};
+
+class AddressParser
+{
+  public:
+    AddressParser(const std::string&);
+    bool parse(Address& address);
+  private:
+    const std::string& input;
+    std::string::size_type current;
+    static const std::string RESERVED;
+
+    bool readChar(char c);
+    bool readQuotedString(Variant& value);
+    bool readString(Variant& value, char delimiter);
+    bool readWord(std::string& word);
+    bool readSimpleValue(Variant& word);
+    bool readKey(std::string& key);
+    bool readValue(Variant& value);
+    bool readKeyValuePair(Variant::Map& map);
+    bool readMap(Variant& value);
+    bool readList(Variant& value);
+    bool error(const std::string& message);
+    bool eos();
+    bool iswhitespace();
+    bool isreserved();
+};
+
+Address::Address() : impl(new AddressImpl()) {}
+Address::Address(const std::string& address) : impl(new AddressImpl())
+{ 
+    AddressParser parser(address);
+    parser.parse(*this);
+}
+Address::Address(const std::string& name, const std::string& subject, const Variant::Map& options,
+                 const std::string& type)
+    : impl(new AddressImpl(name, subject, options)) { setType(type); }
+Address::Address(const Address& a) :
+    impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) {}
+Address::~Address() { delete impl; }
+
+Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; }
+
+
+std::string Address::toStr() const
+{
+    std::stringstream out;
+    out << impl->name;
+    if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject;
+    if (!impl->options.empty()) out << " {" << impl->options << "}";
+    return out.str();
+}
+Address::operator bool() const { return !impl->name.empty(); }
+bool Address::operator !() const { return impl->name.empty(); }
+
+const std::string& Address::getName() const { return impl->name; }
+void Address::setName(const std::string& name) { impl->name = name; }
+const std::string& Address::getSubject() const { return impl->subject; }
+bool Address::hasSubject() const { return !(impl->subject.empty()); }
+void Address::setSubject(const std::string& subject) { impl->subject = subject; }
+const Variant::Map& Address::getOptions() const { return impl->options; }
+Variant::Map& Address::getOptions() { return impl->options; }
+void Address::setOptions(const Variant::Map& options) { impl->options = options; }
+
+
+namespace{
+const Variant EMPTY_VARIANT;
+const std::string EMPTY_STRING;
+}
 
-const std::string TYPE_SEPARATOR(":");
+std::string Address::getType() const
+{
+    const Variant& type = getOption(TYPE);
+    return type.isVoid() ? EMPTY_STRING : type.asString();
+}
+void Address::setType(const std::string& type) { impl->options[TYPE] = type; }
+
+const Variant& Address::getOption(const std::string& key) const
+{
+    Variant::Map::const_iterator i = impl->options.find(key);
+    if (i == impl->options.end()) return EMPTY_VARIANT;
+    else return i->second;
+}
 
 std::ostream& operator<<(std::ostream& out, const Address& address)
 {
-    if (!address.type.empty()) {
-        out << address.type;
-        out << TYPE_SEPARATOR;
-    }
-    out << address.value;
+    out << address.toStr();
     return out;
 }
 
+InvalidAddress::InvalidAddress(const std::string& msg) : Exception(msg) {}
+
+MalformedAddress::MalformedAddress(const std::string& msg) : Exception(msg) {}
+
+AddressParser::AddressParser(const std::string& s) : input(s), current(0) {}
+
+bool AddressParser::error(const std::string& message)
+{
+    throw MalformedAddress(message);//TODO: add more debug detail to error message (position etc)
+}
+
+bool AddressParser::parse(Address& address)
+{
+    std::string name;
+    if (readWord(name)) {
+        if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name;
+        address.setName(name);
+        if (readChar('/')) {
+            std::string subject;
+            if (readWord(subject)) {
+                address.setSubject(subject);
+            } else {
+                return error("Expected subject after /");
+            }
+        }
+        Variant options = Variant::Map();
+        if (readMap(options)) {
+            address.setOptions(options.asMap());
+        }
+        return true;
+    } else {
+        return input.empty() || error("Expected name");
+    }
+}
+
+bool AddressParser::readList(Variant& value)
+{
+    if (readChar('[')) {
+        value = Variant::List();
+        Variant item;
+        while (readValue(item)) {
+            value.asList().push_back(item);
+            if (!readChar(',')) break;
+        }
+        return readChar(']') || error("Unmatched '['!");
+    } else {
+        return false;
+    }
+}
+
+bool AddressParser::readMap(Variant& value)
+{
+    if (readChar('{')) {
+        value = Variant::Map();
+        while (readKeyValuePair(value.asMap()) && readChar(',')) {}
+        return readChar('}') || error("Unmatched '{'!");
+    } else {
+        return false;
+    }
+}
+
+bool AddressParser::readKeyValuePair(Variant::Map& map)
+{
+    std::string key;
+    Variant value;
+    if (readKey(key)) {
+        if (readChar(':') && readValue(value)) {
+            map[key] = value;
+            return true;
+        } else {
+            return error("Bad key-value pair!");
+        }
+    } else {
+        return false;
+    }
+}
+
+bool AddressParser::readKey(std::string& key)
+{
+    return readWord(key);
+}
+
+bool AddressParser::readValue(Variant& value)
+{
+    return readSimpleValue(value) || readQuotedString(value) || 
+        readMap(value)  || readList(value) || error("Expected value");
+}
+
+bool AddressParser::readString(Variant& value, char delimiter)
+{
+    if (readChar(delimiter)) {
+        std::string::size_type start = current++;
+        while (!eos()) {
+            if (input.at(current) == delimiter) {
+                if (current > start) {
+                    value = input.substr(start, current - start);
+                } else {
+                    value = "";
+                }
+                ++current;
+                return true;
+            } else {
+                ++current;
+            }
+        }
+        return error("Unmatched delimiter");
+    } else {
+        return false;
+    }
+}
+
+bool AddressParser::readQuotedString(Variant& value)
+{
+    return readString(value, '"') || readString(value, '\'');
+}
+
+bool AddressParser::readSimpleValue(Variant& value)
+{
+    std::string s;
+    if (readWord(s)) {
+        value = s;
+        try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {}
+        try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {}
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool AddressParser::readWord(std::string& value)
+{
+    //skip leading whitespace
+    while (!eos() && iswhitespace()) ++current;
+
+    //read any number of non-whitespace, non-reserved chars into value
+    std::string::size_type start = current;
+    while (!eos() && !iswhitespace() && !isreserved()) ++current;
+    
+    if (current > start) {
+        value = input.substr(start, current - start);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool AddressParser::readChar(char c)
+{
+    while (!eos()) {
+        if (iswhitespace()) { 
+            ++current; 
+        } else if (input.at(current) == c) {
+            ++current;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    return false;
+}
+
+bool AddressParser::iswhitespace()
+{
+    return ::isspace(input.at(current));
+}
+
+bool AddressParser::isreserved()
+{
+    return RESERVED.find(input.at(current)) != std::string::npos;
+}
+
+bool AddressParser::eos()
+{
+    return current >= input.size();
+}
+
+const std::string AddressParser::RESERVED = "\'\"{}[],:/";
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp Wed Nov 11 13:15:44 2009
@@ -20,7 +20,6 @@
  */
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/Sender.h"
 #include "qpid/messaging/Receiver.h"
@@ -48,30 +47,22 @@
 void Session::reject(Message& m) { impl->reject(m); }
 void Session::close() { impl->close(); }
 
-Sender Session::createSender(const Address& address, const VariantMap& options)
+Sender Session::createSender(const Address& address)
 {
-    return impl->createSender(address, options);
+    return impl->createSender(address);
 }
-Receiver Session::createReceiver(const Address& address, const VariantMap& options)
+Receiver Session::createReceiver(const Address& address)
 {
-    return impl->createReceiver(address, options);
-}
-Receiver Session::createReceiver(const Address& address, const Filter& filter, const VariantMap& options)
-{ 
-    return impl->createReceiver(address, filter, options);
+    return impl->createReceiver(address);
 }
 
-Sender Session::createSender(const std::string& address, const VariantMap& options)
-{ 
-    return impl->createSender(Address(address), options); 
-}
-Receiver Session::createReceiver(const std::string& address, const VariantMap& options)
+Sender Session::createSender(const std::string& address)
 { 
-    return impl->createReceiver(Address(address), options); 
+    return impl->createSender(Address(address)); 
 }
-Receiver Session::createReceiver(const std::string& address, const Filter& filter, const VariantMap& options)
+Receiver Session::createReceiver(const std::string& address)
 { 
-    return impl->createReceiver(Address(address), filter, options); 
+    return impl->createReceiver(Address(address)); 
 }
 
 Address Session::createTempQueue(const std::string& baseName)

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h Wed Nov 11 13:15:44 2009
@@ -23,7 +23,6 @@
  */
 #include "qpid/RefCounted.h"
 #include <string>
-#include "qpid/messaging/Variant.h"
 #include "qpid/sys/Time.h"
 
 namespace qpid {
@@ -53,9 +52,8 @@
     virtual Message fetch(qpid::sys::Duration timeout) = 0;
     virtual bool dispatch(qpid::sys::Duration timeout) = 0;
     virtual Address createTempQueue(const std::string& baseName) = 0;
-    virtual Sender createSender(const Address& address, const VariantMap& options) = 0;
-    virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0;
-    virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0;
+    virtual Sender createSender(const Address& address) = 0;
+    virtual Receiver createReceiver(const Address& address) = 0;
     virtual uint32_t available() = 0;
     virtual uint32_t pendingAck() = 0;
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp Wed Nov 11 13:15:44 2009
@@ -529,6 +529,7 @@
 }
 
 VariantType Variant::getType() const { return impl->getType(); }
+bool Variant::isVoid() const { return impl->getType() == VAR_VOID; }
 bool Variant::asBool() const { return impl->asBool(); }
 uint8_t Variant::asUint8() const { return impl->asUint8(); }
 uint16_t Variant::asUint16() const { return impl->asUint16(); }

Added: qpid/trunk/qpid/cpp/src/tests/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Address.cpp?rev=834869&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Address.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/Address.cpp Wed Nov 11 13:15:44 2009
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Variant.h"
+
+#include "unit_test.h"
+
+using namespace qpid::messaging;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(AddressSuite)
+
+QPID_AUTO_TEST_CASE(testParseNameOnly)
+{
+    Address address("my-topic");
+    BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+}
+
+QPID_AUTO_TEST_CASE(testParseSubject)
+{
+    Address address("my-topic/my-subject");
+    BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+    BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject());
+}
+
+QPID_AUTO_TEST_CASE(testParseOptions)
+{
+    Address address("my-topic {a:bc, x:101, y:'a string'}");
+    BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+    BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString());
+    BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+    BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseSubjectAndOptions)
+{
+    Address address("my-topic/my-subject {a:bc, x:101, y:'a string'}");
+    BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+    BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject());
+    BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString());
+    BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+    BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseNestedOptions)
+{
+    Address address("my-topic {a:{p:202, q:'another string'}, x:101, y:'a string'}");
+    BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+    BOOST_CHECK_EQUAL((uint16_t) 202, address.getOptions()["a"].asMap()["p"].asInt64());
+    BOOST_CHECK_EQUAL(std::string("another string"), address.getOptions()["a"].asMap()["q"].asString());
+    BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+    BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseOptionsWithList)
+{
+    Address address("my-topic {a:[202, 'another string'], x:101}");
+    BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+    Variant::List& list = address.getOptions()["a"].asList();
+    Variant::List::const_iterator i = list.begin();
+    BOOST_CHECK(i != list.end());
+    BOOST_CHECK_EQUAL((uint16_t) 202, i->asInt64());
+    BOOST_CHECK(++i != list.end());
+    BOOST_CHECK_EQUAL(std::string("another string"), i->asString());
+    BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}}

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Nov 11 13:15:44 2009
@@ -114,7 +114,8 @@
 	ReplicationTest.cpp \
 	ClientMessageTest.cpp \
 	PollableCondition.cpp \
-	Variant.cpp
+	Variant.cpp \
+	Address.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org