You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/08/25 19:57:37 UTC

svn commit: r807731 [2/3] - in /qpid/trunk/qpid/cpp: ./ examples/ examples/messaging/ include/qpid/client/amqp0_10/ include/qpid/messaging/ src/ src/qpid/client/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Added: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,99 @@
+#ifndef QPID_MESSAGING_SESSION_H
+#define QPID_MESSAGING_SESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/Handle.h"
+#include "qpid/sys/Time.h"
+#include "Variant.h"
+
+namespace qpid {
+namespace client {
+
+template <class> class PrivateImplRef;
+
+}
+
+namespace messaging {
+
+class Address;
+class Filter;
+class Message;
+class MessageListener;
+class Sender;
+class Receiver;
+class SessionImpl;
+class Subscription;
+
+/**
+ * A session represents a distinct 'conversation' which can involve
+ * sending and receiving messages from different sources and sinks.
+ */
+class Session : public qpid::client::Handle<SessionImpl>
+{
+  public:
+    QPID_CLIENT_EXTERN Session(SessionImpl* impl = 0);
+    QPID_CLIENT_EXTERN Session(const Session&);
+    QPID_CLIENT_EXTERN ~Session();
+    QPID_CLIENT_EXTERN Session& operator=(const Session&);
+
+    QPID_CLIENT_EXTERN void close();
+
+    QPID_CLIENT_EXTERN void commit();
+    QPID_CLIENT_EXTERN void rollback();
+
+    /**
+     * Acknowledges all outstanding messages that have been received
+     * by the application on this session.
+     */
+    QPID_CLIENT_EXTERN void acknowledge();
+    /**
+     * Rejects the specified message. This will prevent the message
+     * being redelivered.
+     */
+    QPID_CLIENT_EXTERN void reject(Message&);
+
+    QPID_CLIENT_EXTERN void sync();
+    QPID_CLIENT_EXTERN void flush();
+
+    QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+    QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+    QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+
+    QPID_CLIENT_EXTERN Sender createSender(const Address& address, const VariantMap& options = VariantMap());
+    QPID_CLIENT_EXTERN Sender createSender(const std::string& address, const VariantMap& options = VariantMap());
+
+    QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const VariantMap& options = VariantMap());
+    QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options = VariantMap());
+    QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const VariantMap& options = VariantMap());
+    QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap());
+
+    QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string());
+
+    QPID_CLIENT_EXTERN void* getLastConfirmedSent();
+    QPID_CLIENT_EXTERN void* getLastConfirmedAcknowledged();
+  private:
+  friend class qpid::client::PrivateImplRef<Session>;
+};
+}} // namespace qpid::messaging
+
+#endif  /*!QPID_MESSAGING_SESSION_H*/

Added: qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,167 @@
+#ifndef QPID_MESSAGING_VARIANT_H
+#define QPID_MESSAGING_VARIANT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <list>
+#include <map>
+#include <ostream>
+#include <string>
+#include "qpid/Exception.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/client/ClientImportExport.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+/**
+ * Thrown when an illegal conversion of a variant is attempted.
+ */
+struct InvalidConversion : public qpid::Exception 
+{
+    InvalidConversion(const std::string& msg);
+};
+
+enum VariantType {
+    VOID = 0,
+    BOOL,
+    UINT8,
+    UINT16,
+    UINT32,
+    UINT64,
+    INT8,
+    INT16,
+    INT32,
+    INT64,
+    FLOAT,
+    DOUBLE,
+    STRING,
+    MAP,
+    LIST
+};
+
+class VariantImpl;
+
+/**
+ * Represents a value of variable type.
+ */
+class Variant
+{
+  public:
+    typedef std::map<std::string, Variant> Map;
+    typedef std::list<Variant> List;
+
+    QPID_CLIENT_EXTERN Variant();
+    QPID_CLIENT_EXTERN Variant(bool);
+    QPID_CLIENT_EXTERN Variant(uint8_t);
+    QPID_CLIENT_EXTERN Variant(uint16_t);
+    QPID_CLIENT_EXTERN Variant(uint32_t);
+    QPID_CLIENT_EXTERN Variant(uint64_t);
+    QPID_CLIENT_EXTERN Variant(int8_t);
+    QPID_CLIENT_EXTERN Variant(int16_t);
+    QPID_CLIENT_EXTERN Variant(int32_t);
+    QPID_CLIENT_EXTERN Variant(int64_t);
+    QPID_CLIENT_EXTERN Variant(float);
+    QPID_CLIENT_EXTERN Variant(double);
+    QPID_CLIENT_EXTERN Variant(const std::string&);
+    QPID_CLIENT_EXTERN Variant(const char*);
+    QPID_CLIENT_EXTERN Variant(const Map&);
+    QPID_CLIENT_EXTERN Variant(const List&);
+    QPID_CLIENT_EXTERN Variant(const Variant&);
+
+    QPID_CLIENT_EXTERN ~Variant();
+
+    QPID_CLIENT_EXTERN VariantType getType() const;
+    
+    QPID_CLIENT_EXTERN Variant& operator=(bool);
+    QPID_CLIENT_EXTERN Variant& operator=(uint8_t);
+    QPID_CLIENT_EXTERN Variant& operator=(uint16_t);
+    QPID_CLIENT_EXTERN Variant& operator=(uint32_t);
+    QPID_CLIENT_EXTERN Variant& operator=(uint64_t);
+    QPID_CLIENT_EXTERN Variant& operator=(int8_t);
+    QPID_CLIENT_EXTERN Variant& operator=(int16_t);
+    QPID_CLIENT_EXTERN Variant& operator=(int32_t);
+    QPID_CLIENT_EXTERN Variant& operator=(int64_t);
+    QPID_CLIENT_EXTERN Variant& operator=(float);
+    QPID_CLIENT_EXTERN Variant& operator=(double);
+    QPID_CLIENT_EXTERN Variant& operator=(const std::string&);
+    QPID_CLIENT_EXTERN Variant& operator=(const char*);
+    QPID_CLIENT_EXTERN Variant& operator=(const Map&);
+    QPID_CLIENT_EXTERN Variant& operator=(const List&);
+    QPID_CLIENT_EXTERN Variant& operator=(const Variant&);
+
+    QPID_CLIENT_EXTERN bool asBool() const;
+    QPID_CLIENT_EXTERN uint8_t asUint8() const;
+    QPID_CLIENT_EXTERN uint16_t asUint16() const;
+    QPID_CLIENT_EXTERN uint32_t asUint32() const;
+    QPID_CLIENT_EXTERN uint64_t asUint64() const;
+    QPID_CLIENT_EXTERN int8_t asInt8() const;
+    QPID_CLIENT_EXTERN int16_t asInt16() const;
+    QPID_CLIENT_EXTERN int32_t asInt32() const;
+    QPID_CLIENT_EXTERN int64_t asInt64() const;
+    QPID_CLIENT_EXTERN float asFloat() const;
+    QPID_CLIENT_EXTERN double asDouble() const;
+    QPID_CLIENT_EXTERN std::string asString() const;
+
+    QPID_CLIENT_EXTERN operator bool() const;
+    QPID_CLIENT_EXTERN operator uint8_t() const;
+    QPID_CLIENT_EXTERN operator uint16_t() const;
+    QPID_CLIENT_EXTERN operator uint32_t() const;
+    QPID_CLIENT_EXTERN operator uint64_t() const;
+    QPID_CLIENT_EXTERN operator int8_t() const;
+    QPID_CLIENT_EXTERN operator int16_t() const;
+    QPID_CLIENT_EXTERN operator int32_t() const;
+    QPID_CLIENT_EXTERN operator int64_t() const;
+    QPID_CLIENT_EXTERN operator float() const;
+    QPID_CLIENT_EXTERN operator double() const;
+    QPID_CLIENT_EXTERN operator const char*() const;
+
+    QPID_CLIENT_EXTERN const Map& asMap() const;
+    QPID_CLIENT_EXTERN Map& asMap();
+    QPID_CLIENT_EXTERN const List& asList() const;
+    QPID_CLIENT_EXTERN List& asList();
+    /**
+     * Unlike asString(), getString() will not do any conversions and
+     * will throw InvalidConversion if the type is not STRING.
+     */
+    QPID_CLIENT_EXTERN const std::string& getString() const;
+    QPID_CLIENT_EXTERN std::string& getString();
+
+    QPID_CLIENT_EXTERN void setEncoding(const std::string&);
+    QPID_CLIENT_EXTERN const std::string& getEncoding() const;
+
+    QPID_CLIENT_EXTERN void reset();    
+  private:
+    VariantImpl* impl;
+};
+
+QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Variant& value);
+QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::Map& map);
+QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::List& list);
+
+typedef Variant::Map VariantMap;
+
+}} // namespace qpid::messaging
+
+#endif  /*!QPID_MESSAGING_VARIANT_H*/

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Aug 25 17:57:34 2009
@@ -523,6 +523,35 @@
      qpid/client/SubscriptionImpl.cpp
      qpid/client/SubscriptionManager.cpp
      qpid/client/SubscriptionManagerImpl.cpp
+     qpid/messaging/Address.cpp
+     qpid/messaging/Connection.cpp
+     qpid/messaging/ConnectionImpl.h
+     qpid/messaging/Filter.cpp
+     qpid/messaging/Message.cpp
+     qpid/messaging/Receiver.cpp
+     qpid/messaging/ReceiverImpl.h
+     qpid/messaging/Session.cpp
+     qpid/messaging/SessionImpl.h
+     qpid/messaging/Sender.cpp
+     qpid/messaging/SenderImpl.h
+     qpid/messaging/Variant.cpp
+     qpid/client/amqp0_10/AddressResolution.h
+     qpid/client/amqp0_10/AddressResolution.cpp
+     qpid/client/amqp0_10/Codecs.cpp
+     qpid/client/amqp0_10/CompletionTracker.h
+     qpid/client/amqp0_10/CompletionTracker.cpp
+     qpid/client/amqp0_10/ConnectionImpl.h
+     qpid/client/amqp0_10/ConnectionImpl.cpp
+     qpid/client/amqp0_10/IncomingMessages.h
+     qpid/client/amqp0_10/IncomingMessages.cpp
+     qpid/client/amqp0_10/MessageSink.h
+     qpid/client/amqp0_10/MessageSource.h
+     qpid/client/amqp0_10/ReceiverImpl.h
+     qpid/client/amqp0_10/ReceiverImpl.cpp
+     qpid/client/amqp0_10/SessionImpl.h
+     qpid/client/amqp0_10/SessionImpl.cpp
+     qpid/client/amqp0_10/SenderImpl.h
+     qpid/client/amqp0_10/SenderImpl.cpp
 )
 add_library (qpidclient SHARED ${libqpidclient_SOURCES})
 target_link_libraries (qpidclient qpidcommon)

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Aug 25 17:57:34 2009
@@ -681,7 +681,36 @@
   qpid/client/SubscriptionImpl.h		\
   qpid/client/SubscriptionManager.cpp		\
   qpid/client/SubscriptionManagerImpl.cpp	\
-  qpid/client/SubscriptionManagerImpl.h
+  qpid/client/SubscriptionManagerImpl.h		\
+  qpid/messaging/Address.cpp			\
+  qpid/messaging/Connection.cpp			\
+  qpid/messaging/Filter.cpp			\
+  qpid/messaging/Message.cpp			\
+  qpid/messaging/Sender.cpp			\
+  qpid/messaging/Receiver.cpp			\
+  qpid/messaging/Session.cpp			\
+  qpid/messaging/Variant.cpp			\
+  qpid/messaging/ConnectionImpl.h 		\
+  qpid/messaging/SenderImpl.h			\
+  qpid/messaging/ReceiverImpl.h			\
+  qpid/messaging/SessionImpl.h			\
+  qpid/client/amqp0_10/AddressResolution.h	\
+  qpid/client/amqp0_10/AddressResolution.cpp	\
+  qpid/client/amqp0_10/Codecs.cpp		\
+  qpid/client/amqp0_10/ConnectionImpl.h	        \
+  qpid/client/amqp0_10/ConnectionImpl.cpp	\
+  qpid/client/amqp0_10/CompletionTracker.h	\
+  qpid/client/amqp0_10/CompletionTracker.cpp	\
+  qpid/client/amqp0_10/IncomingMessages.h	\
+  qpid/client/amqp0_10/IncomingMessages.cpp	\
+  qpid/client/amqp0_10/MessageSink.h		\
+  qpid/client/amqp0_10/MessageSource.h		\
+  qpid/client/amqp0_10/ReceiverImpl.h		\
+  qpid/client/amqp0_10/ReceiverImpl.cpp		\
+  qpid/client/amqp0_10/SessionImpl.h		\
+  qpid/client/amqp0_10/SessionImpl.cpp		\
+  qpid/client/amqp0_10/SenderImpl.h		\
+  qpid/client/amqp0_10/SenderImpl.cpp
 
 # NOTE: only public header files (which should be in ../include)
 # should go in this list. Private headers should go in the SOURCES
@@ -751,7 +780,19 @@
   ../include/qpid/sys/SystemInfo.h		\
   ../include/qpid/sys/Thread.h			\
   ../include/qpid/sys/Time.h			\
-  ../include/qpid/sys/uuid.h
+  ../include/qpid/sys/uuid.h			\
+  ../include/qpid/messaging/Address.h 		\
+  ../include/qpid/messaging/Connection.h 	\
+  ../include/qpid/messaging/Codec.h 	        \
+  ../include/qpid/messaging/Filter.h 		\
+  ../include/qpid/messaging/Message.h 		\
+  ../include/qpid/messaging/MessageContent.h 	\
+  ../include/qpid/messaging/MessageListener.h 	\
+  ../include/qpid/messaging/Sender.h 		\
+  ../include/qpid/messaging/Receiver.h 	        \
+  ../include/qpid/messaging/Session.h 		\
+  ../include/qpid/messaging/Variant.h 		\
+  ../include/qpid/client/amqp0_10/Codecs.h
 
 # Force build of qpidd during dist phase so help2man will work.
 dist-hook: $(BUILT_SOURCES)

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Tue Aug 25 17:57:34 2009
@@ -202,6 +202,16 @@
     return f.result;
 }
 
+framing::SequenceNumber SessionImpl::getCompleteUpTo()
+{
+    SequenceNumber firstIncomplete;
+    {
+        Lock l(state);
+        firstIncomplete = incompleteIn.front();
+    }
+    return --firstIncomplete;
+}
+
 struct MarkCompleted 
 {
     const SequenceNumber& id;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Tue Aug 25 17:57:34 2009
@@ -103,6 +103,7 @@
     void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
     bool isComplete(const framing::SequenceNumber& id);
     bool isCompleteUpTo(const framing::SequenceNumber& id);
+    framing::SequenceNumber getCompleteUpTo();
     void waitForCompletion(const framing::SequenceNumber& id);
     void sendCompletion();
     void sendFlush();

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,464 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/MessageSource.h"
+#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/ReplyTo.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::Exception;
+using qpid::messaging::Address;
+using qpid::messaging::Filter;
+using qpid::messaging::Variant;
+using qpid::framing::FieldTable;
+using qpid::framing::ReplyTo;
+using namespace qpid::framing::message;
+
+
+namespace{
+const Variant EMPTY_VARIANT;
+const FieldTable EMPTY_FIELD_TABLE;
+const std::string EMPTY_STRING;
+
+//option names
+const std::string BROWSE("browse");
+const std::string EXCLUSIVE("exclusive");
+const std::string MODE("mode");
+const std::string NAME("name");
+const std::string UNACKNOWLEDGED("unacknowledged");
+
+const std::string QUEUE_ADDRESS("queue");
+const std::string TOPIC_ADDRESS("topic");
+const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+");
+const std::string DIVIDER("/");
+
+const std::string SIMPLE_SUBSCRIPTION("simple");
+const std::string RELIABLE_SUBSCRIPTION("reliable");
+const std::string DURABLE_SUBSCRIPTION("durable");
+}
+
+class QueueSource : public MessageSource
+{
+  public:
+    QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED, 
+                bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE);
+    void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+    void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+  private:
+    const std::string name;
+    const AcceptMode acceptMode;
+    const AcquireMode acquireMode;
+    const bool exclusive;
+    const FieldTable options;
+};
+
+class Subscription : public MessageSource
+{
+  public:
+    enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE};
+
+    Subscription(const std::string& name, SubscriptionMode mode = SIMPLE,
+                 const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE);
+    void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+    void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+    void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+
+    static SubscriptionMode getMode(const std::string& mode);
+  private:
+    struct Binding
+    {
+        Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+
+        std::string exchange;
+        std::string key;
+        FieldTable options;
+    };
+
+    typedef std::vector<Binding> Bindings;
+
+    const std::string name;
+    const bool autoDelete;
+    const bool durable;
+    const FieldTable queueOptions;
+    const FieldTable subscriptionOptions;
+    Bindings bindings;
+    std::string queue;
+};
+
+class Exchange : public MessageSink
+{
+  public:
+    Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING, 
+             bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, 
+             const FieldTable& options = EMPTY_FIELD_TABLE);
+    void declare(qpid::client::AsyncSession& session, const std::string& name);
+    void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+    void cancel(qpid::client::AsyncSession& session, const std::string& name);
+  private:
+    const std::string name;
+    const std::string defaultSubject;
+    const bool passive;
+    const std::string type;
+    const bool durable;
+    const FieldTable options;
+};
+
+class QueueSink : public MessageSink
+{
+  public:
+    QueueSink(const std::string& name, bool passive=true, bool exclusive=false, 
+              bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
+    void declare(qpid::client::AsyncSession& session, const std::string& name);
+    void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+    void cancel(qpid::client::AsyncSession& session, const std::string& name);
+  private:
+    const std::string name;
+    const bool passive;
+    const bool exclusive;
+    const bool autoDelete;
+    const bool durable;
+    const FieldTable options;
+};
+
+
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject);
+
+const Variant& getOption(const std::string& key, const Variant::Map& options)
+{
+    Variant::Map::const_iterator i = options.find(key);
+    if (i == options.end()) return EMPTY_VARIANT;
+    else return i->second;
+}
+
+std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
+                                                              const Address& address, 
+                                                              const Filter* filter,
+                                                              const Variant::Map& options)
+{
+    //TODO: handle case where there exists a queue and an exchange of
+    //the same name (hence an unqualified address is ambiguous)
+    
+    //TODO: make sure specified address type gives sane error message
+    //if it does npt match the configuration on server
+
+    if (isQueue(session, address)) {
+        //TODO: support auto-created queue as source, if requested by specific option
+
+        AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+        AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED;
+        bool exclusive = getOption(EXCLUSIVE, options).asBool();
+        FieldTable arguments;
+        //TODO: extract subscribe arguments from options (e.g. either
+        //filter out already processed keys and send the rest, or have
+        //a nested map)
+
+        std::auto_ptr<MessageSource> source = 
+            std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments));
+        return source;
+    } else {
+        //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important)
+        std::auto_ptr<Subscription> bindings = 
+            std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(), 
+                                                         Subscription::getMode(getOption(MODE, options).asString())));
+
+        qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value);
+        if (result.getNotFound()) {
+            throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+        } else if (result.getType() == "topic") {
+            if (filter) {
+                if (filter->type != Filter::WILDCARD) {
+                    throw qpid::framing::NotImplementedException(
+                        QPID_MSG("Filters of type " << filter->type << " not supported by address " << address));
+                    
+                }
+                for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) {
+                    bindings->add(address.value, *i, qpid::framing::FieldTable());
+                }
+            } else {
+                //default is to receive all messages
+                bindings->add(address.value, "*", qpid::framing::FieldTable());
+            }
+        } else if (result.getType() == "fanout") {
+            if (filter) {
+                throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address));                
+            }            
+            bindings->add(address.value, address.value, qpid::framing::FieldTable());
+        } else if (result.getType() == "direct") {
+            //TODO: ????
+        } else {
+            //TODO: xml and headers exchanges
+            throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address));                
+        }
+        std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release());
+        return source;
+    }
+}
+
+
+std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
+                                                          const qpid::messaging::Address& address, 
+                                                          const qpid::messaging::Variant::Map& /*options*/)
+{
+    std::auto_ptr<MessageSink> sink;
+    if (isQueue(session, address)) {
+        //TODO: support for auto-created queues as sink
+        sink = std::auto_ptr<MessageSink>(new QueueSink(address.value));
+    } else {
+        std::string subject;
+        if (isTopic(session, address, subject)) {
+            //TODO: support for auto-created exchanges as sink
+            sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject));
+        } else {
+            if (address.type.empty()) {
+                throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+            } else {
+                throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type));
+            }
+        }
+    }
+    return sink;
+}
+
+QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) :
+    name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {}
+
+void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+    session.messageSubscribe(arg::queue=name, 
+                             arg::destination=destination,
+                             arg::acceptMode=acceptMode,
+                             arg::acquireMode=acquireMode,
+                             arg::exclusive=exclusive,
+                             arg::arguments=options);
+}
+
+void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+    session.messageCancel(destination);
+}
+
+Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions)
+    : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE), 
+      queueOptions(qOptions), subscriptionOptions(sOptions) {}
+
+void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
+{
+    bindings.push_back(Binding(exchange, key, options));
+}
+
+void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+    if (name.empty()) {
+        //TODO: use same scheme as JMS client for subscription queue name generation?
+        queue = session.getId().getName() + destination;
+    } else {
+        queue = name;
+    }
+    session.queueDeclare(arg::queue=queue, arg::exclusive=true, 
+                         arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions);
+    for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
+        session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
+    }
+    AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+    session.messageSubscribe(arg::queue=queue, arg::destination=destination, 
+                             arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+}
+
+void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+    session.messageCancel(destination);
+    session.queueDelete(arg::queue=queue);
+}
+
+Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
+    exchange(e), key(k), options(o) {}
+
+Subscription::SubscriptionMode Subscription::getMode(const std::string& s)
+{
+    if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE;
+    else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE;
+    else if (s == DURABLE_SUBSCRIPTION) return DURABLE;
+    else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s)); 
+}
+
+void convert(qpid::messaging::Message& from, qpid::client::Message& to);
+
+Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject, 
+         bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) : 
+    name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {}
+
+void Exchange::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+    //TODO: should this really by synchronous? want to get error if not valid...
+    if (passive) {
+        sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+    } else {
+        sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options);
+    }
+}
+
+void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+{
+    qpid::client::Message message;
+    convert(m, message);
+    if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
+        message.getDeliveryProperties().setRoutingKey(defaultSubject);
+    }
+    session.messageTransfer(arg::destination=name, arg::content=message);
+}
+
+void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
+
+QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive, 
+                     bool _autoDelete, bool _durable, const FieldTable& _options) :
+    name(_name), passive(_passive), exclusive(_exclusive), 
+    autoDelete(_autoDelete), durable(_durable), options(_options) {}
+
+void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+    //TODO: should this really by synchronous?
+    if (passive) {
+        sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+    } else {
+        sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable, 
+                                   arg::autoDelete=autoDelete, arg::arguments=options);
+    }
+}
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+{
+    qpid::client::Message message;
+    convert(m, message);
+    message.getDeliveryProperties().setRoutingKey(name);
+    session.messageTransfer(arg::content=message);
+}
+
+void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
+
+template <class T> void encode(qpid::messaging::Message& from)
+{
+    T codec;
+    from.encode(codec);
+    from.setContentType(T::contentType);
+}
+
+void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
+
+void convert(qpid::messaging::Message& from, qpid::client::Message& to)
+{
+    //TODO: need to avoid copying as much as possible
+    if (from.getContent().isList()) encode<ListCodec>(from);
+    if (from.getContent().isMap())  encode<MapCodec>(from);
+    to.setData(from.getBytes());
+    to.getDeliveryProperties().setRoutingKey(from.getSubject());
+    //TODO: set other delivery properties
+    to.getMessageProperties().setContentType(from.getContentType());
+    const Address& address = from.getReplyTo();
+    if (!address.value.empty()) {
+        to.getMessageProperties().setReplyTo(AddressResolution::convert(address));
+    }
+    translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders());
+    //TODO: set other message properties
+}
+
+Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
+{
+    if (rt.getExchange().empty()) {
+        if (rt.getRoutingKey().empty()) {
+            return Address();//empty address
+        } else {
+            return Address(rt.getRoutingKey(), QUEUE_ADDRESS);
+        }
+    } else {
+        if (rt.getRoutingKey().empty()) {
+            return Address(rt.getExchange(), TOPIC_ADDRESS);
+        } else {
+            return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT);
+        }
+    }    
+}
+
+qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
+{
+    if (address.type == QUEUE_ADDRESS || address.type.empty()) {
+        return ReplyTo(EMPTY_STRING, address.value);
+    } else if (address.type == TOPIC_ADDRESS) {
+        return ReplyTo(address.value, EMPTY_STRING);
+    } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
+        //need to split the value
+        string::size_type i = address.value.find(DIVIDER);
+        if (i != string::npos) {
+            std::string exchange = address.value.substr(0, i);
+            std::string routingKey;
+            if (i+1 < address.value.size()) {
+                routingKey = address.value.substr(i+1);
+            } 
+            return ReplyTo(exchange, routingKey);
+        } else {
+            return ReplyTo(address.value, EMPTY_STRING);
+        }
+    } else {
+        QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type);
+        //treat as queue
+        return ReplyTo(EMPTY_STRING, address.value);
+    }
+}
+
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) 
+{
+    return address.type == QUEUE_ADDRESS || 
+        (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value);
+}
+
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject)
+{
+    if (address.type.empty()) {
+        return !session.exchangeQuery(address.value).getNotFound();
+    } else if (address.type == TOPIC_ADDRESS) {
+        return true;
+    } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
+        string::size_type i = address.value.find(DIVIDER);
+        if (i != string::npos) {
+            std::string exchange = address.value.substr(0, i);
+            if (i+1 < address.value.size()) {
+                subject = address.value.substr(i+1);
+            } 
+        }
+        return true;
+    } else {
+        return false;
+    }
+}
+
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,68 @@
+#ifndef QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H
+#define QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Variant.h"
+#include "qpid/client/Session.h"
+
+namespace qpid {
+
+namespace framing{
+class ReplyTo;
+}
+
+namespace messaging {
+class Address;
+class Filter;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+class MessageSource;
+class MessageSink;
+
+/**
+ * Maps from a generic Address and optional Filter to an AMQP 0-10
+ * MessageSource which will then be used by a ReceiverImpl instance
+ * created for the address.
+ */
+class AddressResolution
+{
+  public:
+    std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session,
+                                               const qpid::messaging::Address& address, 
+                                               const qpid::messaging::Filter* filter,
+                                               const qpid::messaging::Variant::Map& options);
+
+    std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session,
+                                           const qpid::messaging::Address& address, 
+                                           const qpid::messaging::Variant::Map& options);
+
+    static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
+    static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
+
+  private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,299 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/framing/Array.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/List.h"
+#include <algorithm>
+#include <functional>
+
+using namespace qpid::framing;
+using namespace qpid::messaging;
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+namespace {
+const std::string iso885915("iso-8859-15");
+const std::string utf8("utf8");
+const std::string utf16("utf16");
+const std::string amqp0_10_binary("amqp0-10:binary");
+const std::string amqp0_10_bit("amqp0-10:bit");
+const std::string amqp0_10_datetime("amqp0-10:datetime");
+const std::string amqp0_10_struct("amqp0-10:struct");
+}
+
+template <class T, class U, class F> void convert(const T& from, U& to, F f)
+{
+    std::transform(from.begin(), from.end(), std::inserter(to, to.begin()), f);
+}
+
+Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in);
+FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in);
+Variant toVariant(boost::shared_ptr<FieldValue> in);
+boost::shared_ptr<FieldValue> toFieldValue(const Variant& in);
+
+template <class T, class U, class F> void translate(boost::shared_ptr<FieldValue> in, U& u, F f) 
+{
+    T t;
+    getEncodedValue<T>(in, t);
+    convert(t, u, f);
+}
+
+template <class T, class U, class F> T* toFieldValueCollection(const U& u, F f) 
+{
+    typename T::ValueType t;
+    convert(u, t, f);
+    return new T(t);
+}
+
+FieldTableValue* toFieldTableValue(const Variant::Map& map) 
+{
+    FieldTable ft;
+    convert(map, ft, &toFieldTableEntry);
+    return new FieldTableValue(ft);
+}
+
+ListValue* toListValue(const Variant::List& list) 
+{
+    List l;
+    convert(list, l, &toFieldValue);
+    return new ListValue(l);
+}
+
+void setEncodingFor(Variant& out, uint8_t code)
+{
+    switch(code){
+      case 0x80: 
+      case 0x90: 
+      case 0xa0:
+        out.setEncoding(amqp0_10_binary);
+        break;
+      case 0x84:
+      case 0x94:
+        out.setEncoding(iso885915);
+        break;
+      case 0x85:
+      case 0x95:
+        out.setEncoding(utf8);
+        break;
+      case 0x86:
+      case 0x96: 
+        out.setEncoding(utf16);
+        break;
+      case 0xab: 
+        out.setEncoding(amqp0_10_struct);
+        break;
+      default:
+        //do nothing
+        break;
+    }
+}
+
+Variant toVariant(boost::shared_ptr<FieldValue> in)
+{
+    Variant out;
+    //based on AMQP 0-10 typecode, pick most appropriate variant type
+    switch (in->getType()) {
+        //Fixed Width types:
+      case 0x01: out.setEncoding(amqp0_10_binary);
+      case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;
+      case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;
+      case 0x04: break; //TODO: iso-8859-15 char
+      case 0x08: out = in->getIntegerValue<bool, 1>(); break;
+      case 0x010: out.setEncoding(amqp0_10_binary);
+      case 0x011: out = in->getIntegerValue<int16_t, 2>(); break;
+      case 0x012: out = in->getIntegerValue<uint16_t, 2>(); break;
+      case 0x020: out.setEncoding(amqp0_10_binary);
+      case 0x021: out = in->getIntegerValue<int32_t, 4>(); break;
+      case 0x022: out = in->getIntegerValue<uint32_t, 4>(); break;
+      case 0x023: out = in->get<float>(); break;
+      case 0x027: break; //TODO: utf-32 char
+      case 0x030: out.setEncoding(amqp0_10_binary);
+      case 0x031: out = in->getIntegerValue<int64_t, 8>(); break;
+      case 0x038: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
+      case 0x032: out = in->getIntegerValue<uint64_t, 8>(); break;
+      case 0x033:out = in->get<double>(); break;
+
+        //TODO: figure out whether and how to map values with codes 0x40-0xd8
+
+      case 0xf0: break;//void, which is the default value for Variant
+      case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
+        
+        //Variable Width types:
+        //strings:
+      case 0x80: 
+      case 0x84:
+      case 0x85:
+      case 0x86:
+      case 0x90:
+      case 0x94:
+      case 0x95:
+      case 0x96: 
+      case 0xa0:
+      case 0xab:
+        setEncodingFor(out, in->getType());
+        out = in->get<std::string>();
+        break;
+
+      case 0xa8:
+        out = Variant::Map();
+        translate<FieldTable>(in, out.asMap(), &toVariantMapEntry);
+        break;
+
+      case 0xa9:
+        out = Variant::List();
+        translate<List>(in, out.asList(), &toVariant);
+        break;
+      case 0xaa: //convert amqp0-10 array into variant list
+        out = Variant::List();
+        translate<Array>(in, out.asList(), &toVariant);
+        break;
+
+      default:
+        //error?
+        break;
+    }
+    return out;
+}
+
+boost::shared_ptr<FieldValue> toFieldValue(const Variant& in)
+{
+    boost::shared_ptr<FieldValue> out;
+    switch (in.getType()) {
+      case VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
+      case BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
+      case UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
+      case UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
+      case UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
+      case UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
+      case INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
+      case INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
+      case INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
+      case INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
+      case FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
+      case DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
+        //TODO: check encoding (and length?) when deciding what AMQP type to treat string as
+      case STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
+      case MAP: 
+        //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<FieldTableValue>(in.asMap(), &toFieldTableEntry));
+        out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));
+        break;
+      case LIST: 
+        //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<ListValue>(in.asList(), &toFieldValue));
+        out = boost::shared_ptr<FieldValue>(toListValue(in.asList()));
+        break;
+    }
+    return out;
+}
+
+Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in)
+{
+    return Variant::Map::value_type(in.first, toVariant(in.second));
+}
+
+FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in)
+{
+    return FieldTable::value_type(in.first, toFieldValue(in.second));
+}
+
+struct EncodeBuffer
+{
+    char* data;
+    Buffer buffer;
+
+    EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {}
+    ~EncodeBuffer() { delete[] data; }
+
+    template <class T> void encode(T& t) { t.encode(buffer); }
+
+    void getData(std::string& s) { 
+        s.assign(data, buffer.getSize()); 
+    }
+};
+
+struct DecodeBuffer
+{
+    Buffer buffer;
+
+    DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {}
+
+    template <class T> void decode(T& t) { t.decode(buffer); }
+    
+};
+
+template <class T, class U, class F> void _encode(const U& value, std::string& data, F f)
+{
+    T t;
+    convert(value, t, f);
+    EncodeBuffer buffer(t.encodedSize());
+    buffer.encode(t);
+    buffer.getData(data);
+}
+
+template <class T, class U, class F> void _decode(const std::string& data, U& value, F f)
+{
+    T t;
+    DecodeBuffer buffer(data);
+    buffer.decode(t);
+    convert(t, value, f);
+}
+
+void MapCodec::encode(const Variant& value, std::string& data)
+{
+    _encode<FieldTable>(value.asMap(), data, &toFieldTableEntry);
+}
+
+void MapCodec::decode(const std::string& data, Variant& value)
+{
+    value = Variant::Map();
+    _decode<FieldTable>(data, value.asMap(), &toVariantMapEntry);
+}
+
+void ListCodec::encode(const Variant& value, std::string& data)
+{
+    _encode<List>(value.asList(), data, &toFieldValue);
+}
+
+void ListCodec::decode(const std::string& data, Variant& value)
+{
+    value = Variant::List();
+    _decode<List>(data, value.asList(), &toVariant);
+}
+
+void translate(const Variant::Map& from, FieldTable& to)
+{
+    convert(from, to, &toFieldTableEntry);
+}
+
+void translate(const FieldTable& from, Variant::Map& to)
+{
+    convert(from, to, &toVariantMapEntry);
+}
+
+const std::string ListCodec::contentType("amqp0_10/list");
+const std::string MapCodec::contentType("amqp0_10/map");
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "CompletionTracker.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::framing::SequenceNumber;
+
+void CompletionTracker::track(SequenceNumber command, void* token)
+{
+    tokens[command] = token;
+}
+
+void CompletionTracker::completedTo(SequenceNumber command)
+{
+    Tokens::iterator i = tokens.lower_bound(command);
+    if (i != tokens.end()) {
+        lastCompleted = i->second;
+        tokens.erase(tokens.begin(), ++i);
+    }
+}
+
+void* CompletionTracker::getLastCompletedToken()
+{
+    return lastCompleted;
+}
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,50 @@
+#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
+#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceNumber.h"
+#include <map>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Provides a mapping from command ids to application supplied
+ * 'tokens', and is used to determine when the sending or
+ * acknowledging of a specific message is complete.
+ */
+class CompletionTracker
+{
+  public:
+    void track(qpid::framing::SequenceNumber command, void* token);
+    void completedTo(qpid::framing::SequenceNumber command);
+    void* getLastCompletedToken();
+  private:
+    typedef std::map<qpid::framing::SequenceNumber, void*> Tokens;
+    Tokens tokens;
+    void* lastCompleted;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionImpl.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Variant;
+
+template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
+{
+    Variant::Map::const_iterator i = map.find(key);
+    if (i != map.end()) {
+        value = (T) i->second;
+    }
+}
+
+void convert(const Variant::Map& from, ConnectionSettings& to)
+{
+    setIfFound(from, "username", to.username);
+    setIfFound(from, "password", to.password);
+    setIfFound(from, "sasl-mechanism", to.mechanism);
+    setIfFound(from, "sasl-service", to.service);
+    setIfFound(from, "sasl-min-ssf", to.minSsf);
+    setIfFound(from, "sasl-max-ssf", to.maxSsf);
+
+    setIfFound(from, "heartbeat", to.heartbeat);
+    setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
+
+    setIfFound(from, "locale", to.locale);
+    setIfFound(from, "max-channels", to.maxChannels);
+    setIfFound(from, "max-frame-size", to.maxFrameSize);
+    setIfFound(from, "bounds", to.bounds);
+}
+
+ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options)
+{
+    QPID_LOG(debug, "Opening connection to " << url << " with " << options);
+    Url u(url);
+    ConnectionSettings settings;
+    convert(options, settings);
+    connection.open(u, settings);
+}
+
+void ConnectionImpl::close()
+{
+    connection.close();
+}
+
+qpid::messaging::Session ConnectionImpl::newSession()
+{
+    qpid::messaging::Session impl(new SessionImpl(connection.newSession()));
+    return impl;
+}
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,43 @@
+#ifndef QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H
+#define QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/ConnectionImpl.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/client/Connection.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class ConnectionImpl : public qpid::messaging::ConnectionImpl
+{
+  public:
+    ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options);
+    void close();
+    qpid::messaging::Session newSession();
+  private:
+    qpid::client::Connection connection;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,241 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/IncomingMessages.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/enum.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using namespace qpid::framing;
+using namespace qpid::framing::message;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::messaging::Variant;
+
+namespace {
+const std::string EMPTY_STRING;
+
+
+struct GetNone : IncomingMessages::Handler
+{
+    bool accept(IncomingMessages::MessageTransfer&) { return false; }
+};
+
+struct GetAny : IncomingMessages::Handler
+{
+    bool accept(IncomingMessages::MessageTransfer& transfer)
+    { 
+        transfer.retrieve(0);
+        return true;
+    }
+};
+
+struct MatchAndTrack
+{
+    const std::string destination;
+    SequenceSet ids;
+
+    MatchAndTrack(const std::string& d) : destination(d) {}
+
+    bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+    {
+        if (command->as<MessageTransferBody>()->getDestination() == destination) {
+            ids.add(command->getId());
+            return true;
+        } else {
+            return false;
+        }
+    }
+};
+}
+
+IncomingMessages::IncomingMessages(qpid::client::AsyncSession s) : 
+    session(s), 
+    incoming(SessionBase_0_10Access(session).get()->getDemux().getDefault()) {}
+
+bool IncomingMessages::get(Handler& handler, Duration timeout)
+{
+    //search through received list for any transfer of interest:
+    for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
+    {
+        MessageTransfer transfer(*i, *this);
+        if (handler.accept(transfer)) {
+            received.erase(i);
+            return true;
+        }
+    }
+    //none found, check incoming:
+    return process(&handler, timeout);
+}
+
+void IncomingMessages::accept()
+{
+    session.messageAccept(unaccepted);
+    unaccepted.clear();
+}
+
+void IncomingMessages::releaseAll()
+{
+    //first process any received messages...
+    while (!received.empty()) {
+        retrieve(received.front(), 0);
+        received.pop_front();
+    }
+    //then pump out any available messages from incoming queue...
+    GetAny handler;
+    while (process(&handler, 0));
+    //now release all messages
+    session.messageRelease(unaccepted);
+    unaccepted.clear();
+}
+
+void IncomingMessages::releasePending(const std::string& destination)
+{
+    //first pump all available messages from incoming to received...
+    while (process(0, 0));
+
+    //now remove all messages for this destination from received list, recording their ids...
+    MatchAndTrack match(destination);
+    for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i);
+    //now release those messages
+    session.messageRelease(match.ids);
+}
+
+/**
+ * Get a frameset from session queue, waiting for up to the specified
+ * duration and returning true if this could be achieved, false
+ * otherwise. If a destination is supplied, only return a message for
+ * that destination. In this case messages from other destinations
+ * will be held on a received queue.
+ */
+bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
+{
+    AbsTime deadline(AbsTime::now(), duration);
+    FrameSet::shared_ptr content;
+    for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+        if (content->isA<MessageTransferBody>()) {
+            MessageTransfer transfer(content, *this);
+            if (handler && handler->accept(transfer)) {
+                QPID_LOG(debug, "Delivered " << *content->getMethod());
+                return true;
+            } else {
+                //received message for another destination, keep for later
+                QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+                received.push_back(content);
+            }
+        } else {
+            //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+        }
+    }
+    return false;
+}
+
+void populate(qpid::messaging::Message& message, FrameSet& command);
+
+/**
+ * Called when message is retrieved; records retrieval for subsequent
+ * acceptance, marks the command as completed and converts command to
+ * message if message is required
+ */
+void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* message)
+{
+    if (message) {
+        populate(*message, *command);
+    }
+    const MessageTransferBody* transfer = command->as<MessageTransferBody>(); 
+    if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
+        unaccepted.add(command->getId());
+    }
+    session.markCompleted(command->getId(), false, false);
+}
+
+IncomingMessages::MessageTransfer::MessageTransfer(FrameSetPtr c, IncomingMessages& p) : content(c), parent(p) {}
+
+const std::string& IncomingMessages::MessageTransfer::getDestination()
+{
+    return content->as<MessageTransferBody>()->getDestination();
+}
+void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* message)
+{
+    parent.retrieve(content, message);
+}
+
+void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp
+
+void populateHeaders(qpid::messaging::Message& message, 
+                     const DeliveryProperties* deliveryProperties, 
+                     const MessageProperties* messageProperties)
+{
+    if (deliveryProperties) {
+        message.setSubject(deliveryProperties->getRoutingKey());
+        //TODO: convert other delivery properties
+    }
+    if (messageProperties) {
+        message.setContentType(messageProperties->getContentType());
+        if (messageProperties->hasReplyTo()) {
+            message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
+        }
+        translate(messageProperties->getApplicationHeaders(), message.getHeaders());
+        //TODO: convert other message properties
+    }
+}
+
+void populateHeaders(qpid::messaging::Message& message, const AMQHeaderBody* headers)
+{
+    populateHeaders(message, headers->get<DeliveryProperties>(), headers->get<MessageProperties>());
+}
+
+void populate(qpid::messaging::Message& message, FrameSet& command)
+{
+    //need to be able to link the message back to the transfer it was delivered by
+    //e.g. for rejecting. TODO: hide this from API
+    uint32_t commandId = command.getId();
+    message.setInternalId(reinterpret_cast<void*>(commandId));
+        
+    command.getContent(message.getBytes());
+
+    populateHeaders(message, command.getHeaders());
+        
+    //decode content if necessary
+    if (message.getContentType() == ListCodec::contentType) {
+        ListCodec codec;
+        message.decode(codec);
+    } else if (message.getContentType() == MapCodec::contentType) {
+        MapCodec codec;
+        message.decode(codec);
+    }
+}
+
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,91 @@
+#ifndef QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H
+#define QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/BlockingQueue.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+
+namespace framing{
+class FrameSet;
+}
+
+namespace messaging {
+class Message;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * 
+ */
+class IncomingMessages
+{
+  public:
+    typedef boost::shared_ptr<qpid::framing::FrameSet> FrameSetPtr;
+    class MessageTransfer
+    {
+      public:
+        const std::string& getDestination();
+        void retrieve(qpid::messaging::Message* message);
+      private:
+        FrameSetPtr content;
+        IncomingMessages& parent;
+
+        MessageTransfer(FrameSetPtr, IncomingMessages&);
+      friend class IncomingMessages;
+    };
+
+    struct Handler
+    {
+        virtual ~Handler() {}
+        virtual bool accept(MessageTransfer& transfer) = 0;
+    };
+
+    IncomingMessages(qpid::client::AsyncSession session);
+    bool get(Handler& handler, qpid::sys::Duration timeout);
+    //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+    //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout);
+    void accept();
+    void releaseAll();
+    void releasePending(const std::string& destination);
+  private:
+    typedef std::deque<FrameSetPtr> FrameSetQueue;
+
+    qpid::client::AsyncSession session;
+    qpid::framing::SequenceSet unaccepted;
+    boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
+    FrameSetQueue received;
+
+    bool process(Handler*, qpid::sys::Duration);
+    void retrieve(FrameSetPtr, qpid::messaging::Message*);
+
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,50 @@
+#ifndef QPID_CLIENT_AMQP0_10_MESSAGESINK_H
+#define QPID_CLIENT_AMQP0_10_MESSAGESINK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/AsyncSession.h"
+
+namespace qpid {
+
+namespace messaging {
+class Message;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+/**
+ *
+ */
+class MessageSink
+{
+  public:
+    virtual ~MessageSink() {}
+    virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0;
+    virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0;
+    virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0;
+  private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_MESSAGESINK_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,47 @@
+#ifndef QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H
+#define QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/AsyncSession.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Abstraction behind which the AMQP 0-10 commands required to
+ * establish (and tear down) an incoming stream of messages from a
+ * given address are hidden.
+ */
+class MessageSource
+{
+  public:
+    virtual ~MessageSource() {}
+    virtual void subscribe(qpid::client::AsyncSession& session, const std::string& destination) = 0;    
+    virtual void cancel(qpid::client::AsyncSession& session, const std::string& destination) = 0;
+    
+  private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReceiverImpl.h"
+#include "MessageSource.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/MessageListener.h"
+#include "qpid/messaging/Receiver.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Receiver;
+
+void ReceiverImpl::received(qpid::messaging::Message&)
+{
+    //TODO: should this be configurable
+    if (capacity && --window <= capacity/2) {
+        session.sendCompletion();
+        window = capacity;
+    }
+}
+
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+    return parent.get(*this, message, timeout);
+}
+    
+qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) 
+{
+    qpid::messaging::Message result;
+    if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
+    return result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+    if (capacity == 0 && !cancelled) {
+        session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+        if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+    }
+    
+    if (get(message, timeout)) {
+        return true;
+    } else {
+        if (!cancelled) {
+            sync(session).messageFlush(destination);
+            start();//reallocate credit
+        }
+        return get(message, 0);
+    }
+}
+    
+qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) 
+{
+    qpid::messaging::Message result;
+    if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
+    return result;
+}
+
+void ReceiverImpl::cancel() 
+{ 
+    if (!cancelled) {
+        //TODO: should syncronicity be an optional argument to this call?
+        source->cancel(session, destination);
+        //need to be sure cancel is complete and all incoming
+        //framesets are processed before removing the receiver
+        parent.receiverCancelled(destination);
+        cancelled = true;
+    }
+}
+
+void ReceiverImpl::start()
+{
+    if (!cancelled) {
+        started = true;
+        session.messageSetFlowMode(destination, capacity > 0);
+        session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
+        session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+        window = capacity;
+    }
+}
+
+void ReceiverImpl::stop()
+{
+    session.messageStop(destination);
+    started = false;
+}
+
+void ReceiverImpl::subscribe()
+{
+    source->subscribe(session, destination);
+}
+
+void ReceiverImpl::setSession(qpid::client::AsyncSession s) 
+{ 
+    session = s;
+    if (!cancelled) {
+        subscribe();
+        //if we were in started state before the session was changed,
+        //start again on this new session
+        //TODO: locking if receiver is to be threadsafe...
+        if (started) start();
+    }
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+    if (c != capacity) {
+        capacity = c;
+        if (!cancelled && started) {
+            stop();
+            start();
+        }
+    }
+}
+
+void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
+qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
+
+const std::string& ReceiverImpl::getName() const { return destination; }
+
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) : 
+    parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF), 
+    capacity(0), started(false), cancelled(false), listener(0), window(0) {}
+
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,76 @@
+#ifndef QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H
+#define QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/sys/Time.h"
+#include <memory>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class MessageSource;
+class SessionImpl;
+
+/**
+ * A receiver implementation based on an AMQP 0-10 subscription.
+ */
+class ReceiverImpl : public qpid::messaging::ReceiverImpl
+{
+  public:
+
+    ReceiverImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSource> source);
+
+    bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+    qpid::messaging::Message get(qpid::sys::Duration timeout);
+    bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+    qpid::messaging::Message fetch(qpid::sys::Duration timeout);
+    void cancel();
+    void start();
+    void stop();
+    void subscribe();
+    void setSession(qpid::client::AsyncSession s);
+    const std::string& getName() const;
+    void setCapacity(uint32_t);
+    void setListener(qpid::messaging::MessageListener* listener);
+    qpid::messaging::MessageListener* getListener();
+    void received(qpid::messaging::Message& message);
+  private:
+    SessionImpl& parent;
+    const std::auto_ptr<MessageSource> source;
+    const std::string destination;
+    const uint32_t byteCredit;
+    
+    uint32_t capacity;
+    qpid::client::AsyncSession session;
+    bool started;
+    bool cancelled;
+    qpid::messaging::MessageListener* listener;
+    uint32_t window;
+};
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SenderImpl.h"
+#include "MessageSink.h"
+#include "SessionImpl.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) : 
+    parent(_parent), name(_name), sink(_sink) {}
+
+void SenderImpl::send(qpid::messaging::Message& m) 
+{
+    sink->send(session, name, m);
+}
+
+void SenderImpl::cancel()
+{
+    sink->cancel(session, name);
+    parent.senderCancelled(name);
+}
+
+void SenderImpl::setSession(qpid::client::AsyncSession s)
+{
+    session = s;
+    sink->declare(session, name);
+}
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,58 @@
+#ifndef QPID_CLIENT_AMQP0_10_SENDERIMPL_H
+#define QPID_CLIENT_AMQP0_10_SENDERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/SenderImpl.h"
+#include "qpid/client/AsyncSession.h"
+#include <memory>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class MessageSink;
+class SessionImpl;
+
+/**
+ *
+ */
+class SenderImpl : public qpid::messaging::SenderImpl
+{
+  public:
+    SenderImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSink> sink);
+    void send(qpid::messaging::Message&);
+    void cancel();
+    void setSession(qpid::client::AsyncSession);
+
+  private:
+    SessionImpl& parent;
+    const std::string name;
+    std::auto_ptr<MessageSink> sink;
+
+    qpid::client::AsyncSession session;
+    std::string destination;
+    std::string routingKey;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_SENDERIMPL_H*/



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org