You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/08/25 19:57:37 UTC
svn commit: r807731 [2/3] - in /qpid/trunk/qpid/cpp: ./ examples/
examples/messaging/ include/qpid/client/amqp0_10/ include/qpid/messaging/
src/ src/qpid/client/ src/qpid/client/amqp0_10/ src/qpid/messaging/
src/tests/
Added: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,99 @@
+#ifndef QPID_MESSAGING_SESSION_H
+#define QPID_MESSAGING_SESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/Handle.h"
+#include "qpid/sys/Time.h"
+#include "Variant.h"
+
+namespace qpid {
+namespace client {
+
+template <class> class PrivateImplRef;
+
+}
+
+namespace messaging {
+
+class Address;
+class Filter;
+class Message;
+class MessageListener;
+class Sender;
+class Receiver;
+class SessionImpl;
+class Subscription;
+
+/**
+ * A session represents a distinct 'conversation' which can involve
+ * sending and receiving messages from different sources and sinks.
+ */
+class Session : public qpid::client::Handle<SessionImpl>
+{
+ public:
+ QPID_CLIENT_EXTERN Session(SessionImpl* impl = 0);
+ QPID_CLIENT_EXTERN Session(const Session&);
+ QPID_CLIENT_EXTERN ~Session();
+ QPID_CLIENT_EXTERN Session& operator=(const Session&);
+
+ QPID_CLIENT_EXTERN void close();
+
+ QPID_CLIENT_EXTERN void commit();
+ QPID_CLIENT_EXTERN void rollback();
+
+ /**
+ * Acknowledges all outstanding messages that have been received
+ * by the application on this session.
+ */
+ QPID_CLIENT_EXTERN void acknowledge();
+ /**
+ * Rejects the specified message. This will prevent the message
+ * being redelivered.
+ */
+ QPID_CLIENT_EXTERN void reject(Message&);
+
+ QPID_CLIENT_EXTERN void sync();
+ QPID_CLIENT_EXTERN void flush();
+
+ QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+ QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
+
+ QPID_CLIENT_EXTERN Sender createSender(const Address& address, const VariantMap& options = VariantMap());
+ QPID_CLIENT_EXTERN Sender createSender(const std::string& address, const VariantMap& options = VariantMap());
+
+ QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const VariantMap& options = VariantMap());
+ QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options = VariantMap());
+ QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const VariantMap& options = VariantMap());
+ QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap());
+
+ QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string());
+
+ QPID_CLIENT_EXTERN void* getLastConfirmedSent();
+ QPID_CLIENT_EXTERN void* getLastConfirmedAcknowledged();
+ private:
+ friend class qpid::client::PrivateImplRef<Session>;
+};
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_SESSION_H*/
Added: qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Variant.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,167 @@
+#ifndef QPID_MESSAGING_VARIANT_H
+#define QPID_MESSAGING_VARIANT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <list>
+#include <map>
+#include <ostream>
+#include <string>
+#include "qpid/Exception.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/client/ClientImportExport.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+/**
+ * Thrown when an illegal conversion of a variant is attempted.
+ */
+struct InvalidConversion : public qpid::Exception
+{
+ InvalidConversion(const std::string& msg);
+};
+
+enum VariantType {
+ VOID = 0,
+ BOOL,
+ UINT8,
+ UINT16,
+ UINT32,
+ UINT64,
+ INT8,
+ INT16,
+ INT32,
+ INT64,
+ FLOAT,
+ DOUBLE,
+ STRING,
+ MAP,
+ LIST
+};
+
+class VariantImpl;
+
+/**
+ * Represents a value of variable type.
+ */
+class Variant
+{
+ public:
+ typedef std::map<std::string, Variant> Map;
+ typedef std::list<Variant> List;
+
+ QPID_CLIENT_EXTERN Variant();
+ QPID_CLIENT_EXTERN Variant(bool);
+ QPID_CLIENT_EXTERN Variant(uint8_t);
+ QPID_CLIENT_EXTERN Variant(uint16_t);
+ QPID_CLIENT_EXTERN Variant(uint32_t);
+ QPID_CLIENT_EXTERN Variant(uint64_t);
+ QPID_CLIENT_EXTERN Variant(int8_t);
+ QPID_CLIENT_EXTERN Variant(int16_t);
+ QPID_CLIENT_EXTERN Variant(int32_t);
+ QPID_CLIENT_EXTERN Variant(int64_t);
+ QPID_CLIENT_EXTERN Variant(float);
+ QPID_CLIENT_EXTERN Variant(double);
+ QPID_CLIENT_EXTERN Variant(const std::string&);
+ QPID_CLIENT_EXTERN Variant(const char*);
+ QPID_CLIENT_EXTERN Variant(const Map&);
+ QPID_CLIENT_EXTERN Variant(const List&);
+ QPID_CLIENT_EXTERN Variant(const Variant&);
+
+ QPID_CLIENT_EXTERN ~Variant();
+
+ QPID_CLIENT_EXTERN VariantType getType() const;
+
+ QPID_CLIENT_EXTERN Variant& operator=(bool);
+ QPID_CLIENT_EXTERN Variant& operator=(uint8_t);
+ QPID_CLIENT_EXTERN Variant& operator=(uint16_t);
+ QPID_CLIENT_EXTERN Variant& operator=(uint32_t);
+ QPID_CLIENT_EXTERN Variant& operator=(uint64_t);
+ QPID_CLIENT_EXTERN Variant& operator=(int8_t);
+ QPID_CLIENT_EXTERN Variant& operator=(int16_t);
+ QPID_CLIENT_EXTERN Variant& operator=(int32_t);
+ QPID_CLIENT_EXTERN Variant& operator=(int64_t);
+ QPID_CLIENT_EXTERN Variant& operator=(float);
+ QPID_CLIENT_EXTERN Variant& operator=(double);
+ QPID_CLIENT_EXTERN Variant& operator=(const std::string&);
+ QPID_CLIENT_EXTERN Variant& operator=(const char*);
+ QPID_CLIENT_EXTERN Variant& operator=(const Map&);
+ QPID_CLIENT_EXTERN Variant& operator=(const List&);
+ QPID_CLIENT_EXTERN Variant& operator=(const Variant&);
+
+ QPID_CLIENT_EXTERN bool asBool() const;
+ QPID_CLIENT_EXTERN uint8_t asUint8() const;
+ QPID_CLIENT_EXTERN uint16_t asUint16() const;
+ QPID_CLIENT_EXTERN uint32_t asUint32() const;
+ QPID_CLIENT_EXTERN uint64_t asUint64() const;
+ QPID_CLIENT_EXTERN int8_t asInt8() const;
+ QPID_CLIENT_EXTERN int16_t asInt16() const;
+ QPID_CLIENT_EXTERN int32_t asInt32() const;
+ QPID_CLIENT_EXTERN int64_t asInt64() const;
+ QPID_CLIENT_EXTERN float asFloat() const;
+ QPID_CLIENT_EXTERN double asDouble() const;
+ QPID_CLIENT_EXTERN std::string asString() const;
+
+ QPID_CLIENT_EXTERN operator bool() const;
+ QPID_CLIENT_EXTERN operator uint8_t() const;
+ QPID_CLIENT_EXTERN operator uint16_t() const;
+ QPID_CLIENT_EXTERN operator uint32_t() const;
+ QPID_CLIENT_EXTERN operator uint64_t() const;
+ QPID_CLIENT_EXTERN operator int8_t() const;
+ QPID_CLIENT_EXTERN operator int16_t() const;
+ QPID_CLIENT_EXTERN operator int32_t() const;
+ QPID_CLIENT_EXTERN operator int64_t() const;
+ QPID_CLIENT_EXTERN operator float() const;
+ QPID_CLIENT_EXTERN operator double() const;
+ QPID_CLIENT_EXTERN operator const char*() const;
+
+ QPID_CLIENT_EXTERN const Map& asMap() const;
+ QPID_CLIENT_EXTERN Map& asMap();
+ QPID_CLIENT_EXTERN const List& asList() const;
+ QPID_CLIENT_EXTERN List& asList();
+ /**
+ * Unlike asString(), getString() will not do any conversions and
+ * will throw InvalidConversion if the type is not STRING.
+ */
+ QPID_CLIENT_EXTERN const std::string& getString() const;
+ QPID_CLIENT_EXTERN std::string& getString();
+
+ QPID_CLIENT_EXTERN void setEncoding(const std::string&);
+ QPID_CLIENT_EXTERN const std::string& getEncoding() const;
+
+ QPID_CLIENT_EXTERN void reset();
+ private:
+ VariantImpl* impl;
+};
+
+QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Variant& value);
+QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::Map& map);
+QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Variant::List& list);
+
+typedef Variant::Map VariantMap;
+
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_VARIANT_H*/
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Aug 25 17:57:34 2009
@@ -523,6 +523,35 @@
qpid/client/SubscriptionImpl.cpp
qpid/client/SubscriptionManager.cpp
qpid/client/SubscriptionManagerImpl.cpp
+ qpid/messaging/Address.cpp
+ qpid/messaging/Connection.cpp
+ qpid/messaging/ConnectionImpl.h
+ qpid/messaging/Filter.cpp
+ qpid/messaging/Message.cpp
+ qpid/messaging/Receiver.cpp
+ qpid/messaging/ReceiverImpl.h
+ qpid/messaging/Session.cpp
+ qpid/messaging/SessionImpl.h
+ qpid/messaging/Sender.cpp
+ qpid/messaging/SenderImpl.h
+ qpid/messaging/Variant.cpp
+ qpid/client/amqp0_10/AddressResolution.h
+ qpid/client/amqp0_10/AddressResolution.cpp
+ qpid/client/amqp0_10/Codecs.cpp
+ qpid/client/amqp0_10/CompletionTracker.h
+ qpid/client/amqp0_10/CompletionTracker.cpp
+ qpid/client/amqp0_10/ConnectionImpl.h
+ qpid/client/amqp0_10/ConnectionImpl.cpp
+ qpid/client/amqp0_10/IncomingMessages.h
+ qpid/client/amqp0_10/IncomingMessages.cpp
+ qpid/client/amqp0_10/MessageSink.h
+ qpid/client/amqp0_10/MessageSource.h
+ qpid/client/amqp0_10/ReceiverImpl.h
+ qpid/client/amqp0_10/ReceiverImpl.cpp
+ qpid/client/amqp0_10/SessionImpl.h
+ qpid/client/amqp0_10/SessionImpl.cpp
+ qpid/client/amqp0_10/SenderImpl.h
+ qpid/client/amqp0_10/SenderImpl.cpp
)
add_library (qpidclient SHARED ${libqpidclient_SOURCES})
target_link_libraries (qpidclient qpidcommon)
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Aug 25 17:57:34 2009
@@ -681,7 +681,36 @@
qpid/client/SubscriptionImpl.h \
qpid/client/SubscriptionManager.cpp \
qpid/client/SubscriptionManagerImpl.cpp \
- qpid/client/SubscriptionManagerImpl.h
+ qpid/client/SubscriptionManagerImpl.h \
+ qpid/messaging/Address.cpp \
+ qpid/messaging/Connection.cpp \
+ qpid/messaging/Filter.cpp \
+ qpid/messaging/Message.cpp \
+ qpid/messaging/Sender.cpp \
+ qpid/messaging/Receiver.cpp \
+ qpid/messaging/Session.cpp \
+ qpid/messaging/Variant.cpp \
+ qpid/messaging/ConnectionImpl.h \
+ qpid/messaging/SenderImpl.h \
+ qpid/messaging/ReceiverImpl.h \
+ qpid/messaging/SessionImpl.h \
+ qpid/client/amqp0_10/AddressResolution.h \
+ qpid/client/amqp0_10/AddressResolution.cpp \
+ qpid/client/amqp0_10/Codecs.cpp \
+ qpid/client/amqp0_10/ConnectionImpl.h \
+ qpid/client/amqp0_10/ConnectionImpl.cpp \
+ qpid/client/amqp0_10/CompletionTracker.h \
+ qpid/client/amqp0_10/CompletionTracker.cpp \
+ qpid/client/amqp0_10/IncomingMessages.h \
+ qpid/client/amqp0_10/IncomingMessages.cpp \
+ qpid/client/amqp0_10/MessageSink.h \
+ qpid/client/amqp0_10/MessageSource.h \
+ qpid/client/amqp0_10/ReceiverImpl.h \
+ qpid/client/amqp0_10/ReceiverImpl.cpp \
+ qpid/client/amqp0_10/SessionImpl.h \
+ qpid/client/amqp0_10/SessionImpl.cpp \
+ qpid/client/amqp0_10/SenderImpl.h \
+ qpid/client/amqp0_10/SenderImpl.cpp
# NOTE: only public header files (which should be in ../include)
# should go in this list. Private headers should go in the SOURCES
@@ -751,7 +780,19 @@
../include/qpid/sys/SystemInfo.h \
../include/qpid/sys/Thread.h \
../include/qpid/sys/Time.h \
- ../include/qpid/sys/uuid.h
+ ../include/qpid/sys/uuid.h \
+ ../include/qpid/messaging/Address.h \
+ ../include/qpid/messaging/Connection.h \
+ ../include/qpid/messaging/Codec.h \
+ ../include/qpid/messaging/Filter.h \
+ ../include/qpid/messaging/Message.h \
+ ../include/qpid/messaging/MessageContent.h \
+ ../include/qpid/messaging/MessageListener.h \
+ ../include/qpid/messaging/Sender.h \
+ ../include/qpid/messaging/Receiver.h \
+ ../include/qpid/messaging/Session.h \
+ ../include/qpid/messaging/Variant.h \
+ ../include/qpid/client/amqp0_10/Codecs.h
# Force build of qpidd during dist phase so help2man will work.
dist-hook: $(BUILT_SOURCES)
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Tue Aug 25 17:57:34 2009
@@ -202,6 +202,16 @@
return f.result;
}
+framing::SequenceNumber SessionImpl::getCompleteUpTo()
+{
+ SequenceNumber firstIncomplete;
+ {
+ Lock l(state);
+ firstIncomplete = incompleteIn.front();
+ }
+ return --firstIncomplete;
+}
+
struct MarkCompleted
{
const SequenceNumber& id;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Tue Aug 25 17:57:34 2009
@@ -103,6 +103,7 @@
void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
+ framing::SequenceNumber getCompleteUpTo();
void waitForCompletion(const framing::SequenceNumber& id);
void sendCompletion();
void sendFlush();
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,464 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/MessageSource.h"
+#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/ReplyTo.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::Exception;
+using qpid::messaging::Address;
+using qpid::messaging::Filter;
+using qpid::messaging::Variant;
+using qpid::framing::FieldTable;
+using qpid::framing::ReplyTo;
+using namespace qpid::framing::message;
+
+
+namespace{
+const Variant EMPTY_VARIANT;
+const FieldTable EMPTY_FIELD_TABLE;
+const std::string EMPTY_STRING;
+
+//option names
+const std::string BROWSE("browse");
+const std::string EXCLUSIVE("exclusive");
+const std::string MODE("mode");
+const std::string NAME("name");
+const std::string UNACKNOWLEDGED("unacknowledged");
+
+const std::string QUEUE_ADDRESS("queue");
+const std::string TOPIC_ADDRESS("topic");
+const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+");
+const std::string DIVIDER("/");
+
+const std::string SIMPLE_SUBSCRIPTION("simple");
+const std::string RELIABLE_SUBSCRIPTION("reliable");
+const std::string DURABLE_SUBSCRIPTION("durable");
+}
+
+class QueueSource : public MessageSource
+{
+ public:
+ QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED,
+ bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+ void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+ private:
+ const std::string name;
+ const AcceptMode acceptMode;
+ const AcquireMode acquireMode;
+ const bool exclusive;
+ const FieldTable options;
+};
+
+class Subscription : public MessageSource
+{
+ public:
+ enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE};
+
+ Subscription(const std::string& name, SubscriptionMode mode = SIMPLE,
+ const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE);
+ void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+ void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+ void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+
+ static SubscriptionMode getMode(const std::string& mode);
+ private:
+ struct Binding
+ {
+ Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+
+ std::string exchange;
+ std::string key;
+ FieldTable options;
+ };
+
+ typedef std::vector<Binding> Bindings;
+
+ const std::string name;
+ const bool autoDelete;
+ const bool durable;
+ const FieldTable queueOptions;
+ const FieldTable subscriptionOptions;
+ Bindings bindings;
+ std::string queue;
+};
+
+class Exchange : public MessageSink
+{
+ public:
+ Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING,
+ bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false,
+ const FieldTable& options = EMPTY_FIELD_TABLE);
+ void declare(qpid::client::AsyncSession& session, const std::string& name);
+ void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+ void cancel(qpid::client::AsyncSession& session, const std::string& name);
+ private:
+ const std::string name;
+ const std::string defaultSubject;
+ const bool passive;
+ const std::string type;
+ const bool durable;
+ const FieldTable options;
+};
+
+class QueueSink : public MessageSink
+{
+ public:
+ QueueSink(const std::string& name, bool passive=true, bool exclusive=false,
+ bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ void declare(qpid::client::AsyncSession& session, const std::string& name);
+ void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+ void cancel(qpid::client::AsyncSession& session, const std::string& name);
+ private:
+ const std::string name;
+ const bool passive;
+ const bool exclusive;
+ const bool autoDelete;
+ const bool durable;
+ const FieldTable options;
+};
+
+
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject);
+
+const Variant& getOption(const std::string& key, const Variant::Map& options)
+{
+ Variant::Map::const_iterator i = options.find(key);
+ if (i == options.end()) return EMPTY_VARIANT;
+ else return i->second;
+}
+
+std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
+ const Address& address,
+ const Filter* filter,
+ const Variant::Map& options)
+{
+ //TODO: handle case where there exists a queue and an exchange of
+ //the same name (hence an unqualified address is ambiguous)
+
+ //TODO: make sure specified address type gives sane error message
+ //if it does npt match the configuration on server
+
+ if (isQueue(session, address)) {
+ //TODO: support auto-created queue as source, if requested by specific option
+
+ AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+ AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED;
+ bool exclusive = getOption(EXCLUSIVE, options).asBool();
+ FieldTable arguments;
+ //TODO: extract subscribe arguments from options (e.g. either
+ //filter out already processed keys and send the rest, or have
+ //a nested map)
+
+ std::auto_ptr<MessageSource> source =
+ std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments));
+ return source;
+ } else {
+ //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important)
+ std::auto_ptr<Subscription> bindings =
+ std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(),
+ Subscription::getMode(getOption(MODE, options).asString())));
+
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value);
+ if (result.getNotFound()) {
+ throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+ } else if (result.getType() == "topic") {
+ if (filter) {
+ if (filter->type != Filter::WILDCARD) {
+ throw qpid::framing::NotImplementedException(
+ QPID_MSG("Filters of type " << filter->type << " not supported by address " << address));
+
+ }
+ for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) {
+ bindings->add(address.value, *i, qpid::framing::FieldTable());
+ }
+ } else {
+ //default is to receive all messages
+ bindings->add(address.value, "*", qpid::framing::FieldTable());
+ }
+ } else if (result.getType() == "fanout") {
+ if (filter) {
+ throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address));
+ }
+ bindings->add(address.value, address.value, qpid::framing::FieldTable());
+ } else if (result.getType() == "direct") {
+ //TODO: ????
+ } else {
+ //TODO: xml and headers exchanges
+ throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address));
+ }
+ std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release());
+ return source;
+ }
+}
+
+
+std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Variant::Map& /*options*/)
+{
+ std::auto_ptr<MessageSink> sink;
+ if (isQueue(session, address)) {
+ //TODO: support for auto-created queues as sink
+ sink = std::auto_ptr<MessageSink>(new QueueSink(address.value));
+ } else {
+ std::string subject;
+ if (isTopic(session, address, subject)) {
+ //TODO: support for auto-created exchanges as sink
+ sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject));
+ } else {
+ if (address.type.empty()) {
+ throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+ } else {
+ throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type));
+ }
+ }
+ }
+ return sink;
+}
+
+QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) :
+ name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {}
+
+void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ session.messageSubscribe(arg::queue=name,
+ arg::destination=destination,
+ arg::acceptMode=acceptMode,
+ arg::acquireMode=acquireMode,
+ arg::exclusive=exclusive,
+ arg::arguments=options);
+}
+
+void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ session.messageCancel(destination);
+}
+
+Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions)
+ : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE),
+ queueOptions(qOptions), subscriptionOptions(sOptions) {}
+
+void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
+{
+ bindings.push_back(Binding(exchange, key, options));
+}
+
+void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ if (name.empty()) {
+ //TODO: use same scheme as JMS client for subscription queue name generation?
+ queue = session.getId().getName() + destination;
+ } else {
+ queue = name;
+ }
+ session.queueDeclare(arg::queue=queue, arg::exclusive=true,
+ arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions);
+ for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
+ }
+ AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+}
+
+void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ session.messageCancel(destination);
+ session.queueDelete(arg::queue=queue);
+}
+
+Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
+ exchange(e), key(k), options(o) {}
+
+Subscription::SubscriptionMode Subscription::getMode(const std::string& s)
+{
+ if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE;
+ else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE;
+ else if (s == DURABLE_SUBSCRIPTION) return DURABLE;
+ else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s));
+}
+
+void convert(qpid::messaging::Message& from, qpid::client::Message& to);
+
+Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject,
+ bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) :
+ name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {}
+
+void Exchange::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ //TODO: should this really by synchronous? want to get error if not valid...
+ if (passive) {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } else {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options);
+ }
+}
+
+void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+{
+ qpid::client::Message message;
+ convert(m, message);
+ if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
+ message.getDeliveryProperties().setRoutingKey(defaultSubject);
+ }
+ session.messageTransfer(arg::destination=name, arg::content=message);
+}
+
+void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
+
+QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive,
+ bool _autoDelete, bool _durable, const FieldTable& _options) :
+ name(_name), passive(_passive), exclusive(_exclusive),
+ autoDelete(_autoDelete), durable(_durable), options(_options) {}
+
+void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ //TODO: should this really by synchronous?
+ if (passive) {
+ sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+ } else {
+ sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable,
+ arg::autoDelete=autoDelete, arg::arguments=options);
+ }
+}
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+{
+ qpid::client::Message message;
+ convert(m, message);
+ message.getDeliveryProperties().setRoutingKey(name);
+ session.messageTransfer(arg::content=message);
+}
+
+void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
+
+template <class T> void encode(qpid::messaging::Message& from)
+{
+ T codec;
+ from.encode(codec);
+ from.setContentType(T::contentType);
+}
+
+void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
+
+void convert(qpid::messaging::Message& from, qpid::client::Message& to)
+{
+ //TODO: need to avoid copying as much as possible
+ if (from.getContent().isList()) encode<ListCodec>(from);
+ if (from.getContent().isMap()) encode<MapCodec>(from);
+ to.setData(from.getBytes());
+ to.getDeliveryProperties().setRoutingKey(from.getSubject());
+ //TODO: set other delivery properties
+ to.getMessageProperties().setContentType(from.getContentType());
+ const Address& address = from.getReplyTo();
+ if (!address.value.empty()) {
+ to.getMessageProperties().setReplyTo(AddressResolution::convert(address));
+ }
+ translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders());
+ //TODO: set other message properties
+}
+
+Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
+{
+ if (rt.getExchange().empty()) {
+ if (rt.getRoutingKey().empty()) {
+ return Address();//empty address
+ } else {
+ return Address(rt.getRoutingKey(), QUEUE_ADDRESS);
+ }
+ } else {
+ if (rt.getRoutingKey().empty()) {
+ return Address(rt.getExchange(), TOPIC_ADDRESS);
+ } else {
+ return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT);
+ }
+ }
+}
+
+qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
+{
+ if (address.type == QUEUE_ADDRESS || address.type.empty()) {
+ return ReplyTo(EMPTY_STRING, address.value);
+ } else if (address.type == TOPIC_ADDRESS) {
+ return ReplyTo(address.value, EMPTY_STRING);
+ } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
+ //need to split the value
+ string::size_type i = address.value.find(DIVIDER);
+ if (i != string::npos) {
+ std::string exchange = address.value.substr(0, i);
+ std::string routingKey;
+ if (i+1 < address.value.size()) {
+ routingKey = address.value.substr(i+1);
+ }
+ return ReplyTo(exchange, routingKey);
+ } else {
+ return ReplyTo(address.value, EMPTY_STRING);
+ }
+ } else {
+ QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type);
+ //treat as queue
+ return ReplyTo(EMPTY_STRING, address.value);
+ }
+}
+
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
+{
+ return address.type == QUEUE_ADDRESS ||
+ (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value);
+}
+
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject)
+{
+ if (address.type.empty()) {
+ return !session.exchangeQuery(address.value).getNotFound();
+ } else if (address.type == TOPIC_ADDRESS) {
+ return true;
+ } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
+ string::size_type i = address.value.find(DIVIDER);
+ if (i != string::npos) {
+ std::string exchange = address.value.substr(0, i);
+ if (i+1 < address.value.size()) {
+ subject = address.value.substr(i+1);
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,68 @@
+#ifndef QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H
+#define QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Variant.h"
+#include "qpid/client/Session.h"
+
+namespace qpid {
+
+namespace framing{
+class ReplyTo;
+}
+
+namespace messaging {
+class Address;
+class Filter;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+class MessageSource;
+class MessageSink;
+
+/**
+ * Maps from a generic Address and optional Filter to an AMQP 0-10
+ * MessageSource which will then be used by a ReceiverImpl instance
+ * created for the address.
+ */
+class AddressResolution
+{
+ public:
+ std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Filter* filter,
+ const qpid::messaging::Variant::Map& options);
+
+ std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Variant::Map& options);
+
+ static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
+ static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
+
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/Codecs.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,299 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/framing/Array.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/List.h"
+#include <algorithm>
+#include <functional>
+
+using namespace qpid::framing;
+using namespace qpid::messaging;
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+namespace {
+const std::string iso885915("iso-8859-15");
+const std::string utf8("utf8");
+const std::string utf16("utf16");
+const std::string amqp0_10_binary("amqp0-10:binary");
+const std::string amqp0_10_bit("amqp0-10:bit");
+const std::string amqp0_10_datetime("amqp0-10:datetime");
+const std::string amqp0_10_struct("amqp0-10:struct");
+}
+
+template <class T, class U, class F> void convert(const T& from, U& to, F f)
+{
+ std::transform(from.begin(), from.end(), std::inserter(to, to.begin()), f);
+}
+
+Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in);
+FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in);
+Variant toVariant(boost::shared_ptr<FieldValue> in);
+boost::shared_ptr<FieldValue> toFieldValue(const Variant& in);
+
+template <class T, class U, class F> void translate(boost::shared_ptr<FieldValue> in, U& u, F f)
+{
+ T t;
+ getEncodedValue<T>(in, t);
+ convert(t, u, f);
+}
+
+template <class T, class U, class F> T* toFieldValueCollection(const U& u, F f)
+{
+ typename T::ValueType t;
+ convert(u, t, f);
+ return new T(t);
+}
+
+FieldTableValue* toFieldTableValue(const Variant::Map& map)
+{
+ FieldTable ft;
+ convert(map, ft, &toFieldTableEntry);
+ return new FieldTableValue(ft);
+}
+
+ListValue* toListValue(const Variant::List& list)
+{
+ List l;
+ convert(list, l, &toFieldValue);
+ return new ListValue(l);
+}
+
+void setEncodingFor(Variant& out, uint8_t code)
+{
+ switch(code){
+ case 0x80:
+ case 0x90:
+ case 0xa0:
+ out.setEncoding(amqp0_10_binary);
+ break;
+ case 0x84:
+ case 0x94:
+ out.setEncoding(iso885915);
+ break;
+ case 0x85:
+ case 0x95:
+ out.setEncoding(utf8);
+ break;
+ case 0x86:
+ case 0x96:
+ out.setEncoding(utf16);
+ break;
+ case 0xab:
+ out.setEncoding(amqp0_10_struct);
+ break;
+ default:
+ //do nothing
+ break;
+ }
+}
+
+Variant toVariant(boost::shared_ptr<FieldValue> in)
+{
+ Variant out;
+ //based on AMQP 0-10 typecode, pick most appropriate variant type
+ switch (in->getType()) {
+ //Fixed Width types:
+ case 0x01: out.setEncoding(amqp0_10_binary);
+ case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;
+ case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;
+ case 0x04: break; //TODO: iso-8859-15 char
+ case 0x08: out = in->getIntegerValue<bool, 1>(); break;
+ case 0x010: out.setEncoding(amqp0_10_binary);
+ case 0x011: out = in->getIntegerValue<int16_t, 2>(); break;
+ case 0x012: out = in->getIntegerValue<uint16_t, 2>(); break;
+ case 0x020: out.setEncoding(amqp0_10_binary);
+ case 0x021: out = in->getIntegerValue<int32_t, 4>(); break;
+ case 0x022: out = in->getIntegerValue<uint32_t, 4>(); break;
+ case 0x023: out = in->get<float>(); break;
+ case 0x027: break; //TODO: utf-32 char
+ case 0x030: out.setEncoding(amqp0_10_binary);
+ case 0x031: out = in->getIntegerValue<int64_t, 8>(); break;
+ case 0x038: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
+ case 0x032: out = in->getIntegerValue<uint64_t, 8>(); break;
+ case 0x033:out = in->get<double>(); break;
+
+ //TODO: figure out whether and how to map values with codes 0x40-0xd8
+
+ case 0xf0: break;//void, which is the default value for Variant
+ case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
+
+ //Variable Width types:
+ //strings:
+ case 0x80:
+ case 0x84:
+ case 0x85:
+ case 0x86:
+ case 0x90:
+ case 0x94:
+ case 0x95:
+ case 0x96:
+ case 0xa0:
+ case 0xab:
+ setEncodingFor(out, in->getType());
+ out = in->get<std::string>();
+ break;
+
+ case 0xa8:
+ out = Variant::Map();
+ translate<FieldTable>(in, out.asMap(), &toVariantMapEntry);
+ break;
+
+ case 0xa9:
+ out = Variant::List();
+ translate<List>(in, out.asList(), &toVariant);
+ break;
+ case 0xaa: //convert amqp0-10 array into variant list
+ out = Variant::List();
+ translate<Array>(in, out.asList(), &toVariant);
+ break;
+
+ default:
+ //error?
+ break;
+ }
+ return out;
+}
+
+boost::shared_ptr<FieldValue> toFieldValue(const Variant& in)
+{
+ boost::shared_ptr<FieldValue> out;
+ switch (in.getType()) {
+ case VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
+ case BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
+ case UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
+ case UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
+ case UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
+ case UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
+ case INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
+ case INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
+ case INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
+ case INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
+ case FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
+ case DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
+ //TODO: check encoding (and length?) when deciding what AMQP type to treat string as
+ case STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
+ case MAP:
+ //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<FieldTableValue>(in.asMap(), &toFieldTableEntry));
+ out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));
+ break;
+ case LIST:
+ //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<ListValue>(in.asList(), &toFieldValue));
+ out = boost::shared_ptr<FieldValue>(toListValue(in.asList()));
+ break;
+ }
+ return out;
+}
+
+Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in)
+{
+ return Variant::Map::value_type(in.first, toVariant(in.second));
+}
+
+FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in)
+{
+ return FieldTable::value_type(in.first, toFieldValue(in.second));
+}
+
+struct EncodeBuffer
+{
+ char* data;
+ Buffer buffer;
+
+ EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {}
+ ~EncodeBuffer() { delete[] data; }
+
+ template <class T> void encode(T& t) { t.encode(buffer); }
+
+ void getData(std::string& s) {
+ s.assign(data, buffer.getSize());
+ }
+};
+
+struct DecodeBuffer
+{
+ Buffer buffer;
+
+ DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {}
+
+ template <class T> void decode(T& t) { t.decode(buffer); }
+
+};
+
+template <class T, class U, class F> void _encode(const U& value, std::string& data, F f)
+{
+ T t;
+ convert(value, t, f);
+ EncodeBuffer buffer(t.encodedSize());
+ buffer.encode(t);
+ buffer.getData(data);
+}
+
+template <class T, class U, class F> void _decode(const std::string& data, U& value, F f)
+{
+ T t;
+ DecodeBuffer buffer(data);
+ buffer.decode(t);
+ convert(t, value, f);
+}
+
+void MapCodec::encode(const Variant& value, std::string& data)
+{
+ _encode<FieldTable>(value.asMap(), data, &toFieldTableEntry);
+}
+
+void MapCodec::decode(const std::string& data, Variant& value)
+{
+ value = Variant::Map();
+ _decode<FieldTable>(data, value.asMap(), &toVariantMapEntry);
+}
+
+void ListCodec::encode(const Variant& value, std::string& data)
+{
+ _encode<List>(value.asList(), data, &toFieldValue);
+}
+
+void ListCodec::decode(const std::string& data, Variant& value)
+{
+ value = Variant::List();
+ _decode<List>(data, value.asList(), &toVariant);
+}
+
+void translate(const Variant::Map& from, FieldTable& to)
+{
+ convert(from, to, &toFieldTableEntry);
+}
+
+void translate(const FieldTable& from, Variant::Map& to)
+{
+ convert(from, to, &toVariantMapEntry);
+}
+
+const std::string ListCodec::contentType("amqp0_10/list");
+const std::string MapCodec::contentType("amqp0_10/map");
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "CompletionTracker.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::framing::SequenceNumber;
+
+void CompletionTracker::track(SequenceNumber command, void* token)
+{
+ tokens[command] = token;
+}
+
+void CompletionTracker::completedTo(SequenceNumber command)
+{
+ Tokens::iterator i = tokens.lower_bound(command);
+ if (i != tokens.end()) {
+ lastCompleted = i->second;
+ tokens.erase(tokens.begin(), ++i);
+ }
+}
+
+void* CompletionTracker::getLastCompletedToken()
+{
+ return lastCompleted;
+}
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,50 @@
+#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
+#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceNumber.h"
+#include <map>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Provides a mapping from command ids to application supplied
+ * 'tokens', and is used to determine when the sending or
+ * acknowledging of a specific message is complete.
+ */
+class CompletionTracker
+{
+ public:
+ void track(qpid::framing::SequenceNumber command, void* token);
+ void completedTo(qpid::framing::SequenceNumber command);
+ void* getLastCompletedToken();
+ private:
+ typedef std::map<qpid::framing::SequenceNumber, void*> Tokens;
+ Tokens tokens;
+ void* lastCompleted;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionImpl.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Variant;
+
+template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value = (T) i->second;
+ }
+}
+
+void convert(const Variant::Map& from, ConnectionSettings& to)
+{
+ setIfFound(from, "username", to.username);
+ setIfFound(from, "password", to.password);
+ setIfFound(from, "sasl-mechanism", to.mechanism);
+ setIfFound(from, "sasl-service", to.service);
+ setIfFound(from, "sasl-min-ssf", to.minSsf);
+ setIfFound(from, "sasl-max-ssf", to.maxSsf);
+
+ setIfFound(from, "heartbeat", to.heartbeat);
+ setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
+
+ setIfFound(from, "locale", to.locale);
+ setIfFound(from, "max-channels", to.maxChannels);
+ setIfFound(from, "max-frame-size", to.maxFrameSize);
+ setIfFound(from, "bounds", to.bounds);
+}
+
+ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options)
+{
+ QPID_LOG(debug, "Opening connection to " << url << " with " << options);
+ Url u(url);
+ ConnectionSettings settings;
+ convert(options, settings);
+ connection.open(u, settings);
+}
+
+void ConnectionImpl::close()
+{
+ connection.close();
+}
+
+qpid::messaging::Session ConnectionImpl::newSession()
+{
+ qpid::messaging::Session impl(new SessionImpl(connection.newSession()));
+ return impl;
+}
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,43 @@
+#ifndef QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H
+#define QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/ConnectionImpl.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/client/Connection.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class ConnectionImpl : public qpid::messaging::ConnectionImpl
+{
+ public:
+ ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options);
+ void close();
+ qpid::messaging::Session newSession();
+ private:
+ qpid::client::Connection connection;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,241 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/IncomingMessages.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/enum.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using namespace qpid::framing;
+using namespace qpid::framing::message;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::messaging::Variant;
+
+namespace {
+const std::string EMPTY_STRING;
+
+
+struct GetNone : IncomingMessages::Handler
+{
+ bool accept(IncomingMessages::MessageTransfer&) { return false; }
+};
+
+struct GetAny : IncomingMessages::Handler
+{
+ bool accept(IncomingMessages::MessageTransfer& transfer)
+ {
+ transfer.retrieve(0);
+ return true;
+ }
+};
+
+struct MatchAndTrack
+{
+ const std::string destination;
+ SequenceSet ids;
+
+ MatchAndTrack(const std::string& d) : destination(d) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ids.add(command->getId());
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+}
+
+IncomingMessages::IncomingMessages(qpid::client::AsyncSession s) :
+ session(s),
+ incoming(SessionBase_0_10Access(session).get()->getDemux().getDefault()) {}
+
+bool IncomingMessages::get(Handler& handler, Duration timeout)
+{
+ //search through received list for any transfer of interest:
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
+ {
+ MessageTransfer transfer(*i, *this);
+ if (handler.accept(transfer)) {
+ received.erase(i);
+ return true;
+ }
+ }
+ //none found, check incoming:
+ return process(&handler, timeout);
+}
+
+void IncomingMessages::accept()
+{
+ session.messageAccept(unaccepted);
+ unaccepted.clear();
+}
+
+void IncomingMessages::releaseAll()
+{
+ //first process any received messages...
+ while (!received.empty()) {
+ retrieve(received.front(), 0);
+ received.pop_front();
+ }
+ //then pump out any available messages from incoming queue...
+ GetAny handler;
+ while (process(&handler, 0));
+ //now release all messages
+ session.messageRelease(unaccepted);
+ unaccepted.clear();
+}
+
+void IncomingMessages::releasePending(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0));
+
+ //now remove all messages for this destination from received list, recording their ids...
+ MatchAndTrack match(destination);
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i);
+ //now release those messages
+ session.messageRelease(match.ids);
+}
+
+/**
+ * Get a frameset from session queue, waiting for up to the specified
+ * duration and returning true if this could be achieved, false
+ * otherwise. If a destination is supplied, only return a message for
+ * that destination. In this case messages from other destinations
+ * will be held on a received queue.
+ */
+bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
+{
+ AbsTime deadline(AbsTime::now(), duration);
+ FrameSet::shared_ptr content;
+ for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ MessageTransfer transfer(content, *this);
+ if (handler && handler->accept(transfer)) {
+ QPID_LOG(debug, "Delivered " << *content->getMethod());
+ return true;
+ } else {
+ //received message for another destination, keep for later
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ received.push_back(content);
+ }
+ } else {
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+ }
+ }
+ return false;
+}
+
+void populate(qpid::messaging::Message& message, FrameSet& command);
+
+/**
+ * Called when message is retrieved; records retrieval for subsequent
+ * acceptance, marks the command as completed and converts command to
+ * message if message is required
+ */
+void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* message)
+{
+ if (message) {
+ populate(*message, *command);
+ }
+ const MessageTransferBody* transfer = command->as<MessageTransferBody>();
+ if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
+ unaccepted.add(command->getId());
+ }
+ session.markCompleted(command->getId(), false, false);
+}
+
+IncomingMessages::MessageTransfer::MessageTransfer(FrameSetPtr c, IncomingMessages& p) : content(c), parent(p) {}
+
+const std::string& IncomingMessages::MessageTransfer::getDestination()
+{
+ return content->as<MessageTransferBody>()->getDestination();
+}
+void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* message)
+{
+ parent.retrieve(content, message);
+}
+
+void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp
+
+void populateHeaders(qpid::messaging::Message& message,
+ const DeliveryProperties* deliveryProperties,
+ const MessageProperties* messageProperties)
+{
+ if (deliveryProperties) {
+ message.setSubject(deliveryProperties->getRoutingKey());
+ //TODO: convert other delivery properties
+ }
+ if (messageProperties) {
+ message.setContentType(messageProperties->getContentType());
+ if (messageProperties->hasReplyTo()) {
+ message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
+ }
+ translate(messageProperties->getApplicationHeaders(), message.getHeaders());
+ //TODO: convert other message properties
+ }
+}
+
+void populateHeaders(qpid::messaging::Message& message, const AMQHeaderBody* headers)
+{
+ populateHeaders(message, headers->get<DeliveryProperties>(), headers->get<MessageProperties>());
+}
+
+void populate(qpid::messaging::Message& message, FrameSet& command)
+{
+ //need to be able to link the message back to the transfer it was delivered by
+ //e.g. for rejecting. TODO: hide this from API
+ uint32_t commandId = command.getId();
+ message.setInternalId(reinterpret_cast<void*>(commandId));
+
+ command.getContent(message.getBytes());
+
+ populateHeaders(message, command.getHeaders());
+
+ //decode content if necessary
+ if (message.getContentType() == ListCodec::contentType) {
+ ListCodec codec;
+ message.decode(codec);
+ } else if (message.getContentType() == MapCodec::contentType) {
+ MapCodec codec;
+ message.decode(codec);
+ }
+}
+
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,91 @@
+#ifndef QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H
+#define QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/BlockingQueue.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+
+namespace framing{
+class FrameSet;
+}
+
+namespace messaging {
+class Message;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+/**
+ *
+ */
+class IncomingMessages
+{
+ public:
+ typedef boost::shared_ptr<qpid::framing::FrameSet> FrameSetPtr;
+ class MessageTransfer
+ {
+ public:
+ const std::string& getDestination();
+ void retrieve(qpid::messaging::Message* message);
+ private:
+ FrameSetPtr content;
+ IncomingMessages& parent;
+
+ MessageTransfer(FrameSetPtr, IncomingMessages&);
+ friend class IncomingMessages;
+ };
+
+ struct Handler
+ {
+ virtual ~Handler() {}
+ virtual bool accept(MessageTransfer& transfer) = 0;
+ };
+
+ IncomingMessages(qpid::client::AsyncSession session);
+ bool get(Handler& handler, qpid::sys::Duration timeout);
+ //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ void accept();
+ void releaseAll();
+ void releasePending(const std::string& destination);
+ private:
+ typedef std::deque<FrameSetPtr> FrameSetQueue;
+
+ qpid::client::AsyncSession session;
+ qpid::framing::SequenceSet unaccepted;
+ boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
+ FrameSetQueue received;
+
+ bool process(Handler*, qpid::sys::Duration);
+ void retrieve(FrameSetPtr, qpid::messaging::Message*);
+
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,50 @@
+#ifndef QPID_CLIENT_AMQP0_10_MESSAGESINK_H
+#define QPID_CLIENT_AMQP0_10_MESSAGESINK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/AsyncSession.h"
+
+namespace qpid {
+
+namespace messaging {
+class Message;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+/**
+ *
+ */
+class MessageSink
+{
+ public:
+ virtual ~MessageSink() {}
+ virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0;
+ virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0;
+ virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0;
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESINK_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSource.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,47 @@
+#ifndef QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H
+#define QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/AsyncSession.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Abstraction behind which the AMQP 0-10 commands required to
+ * establish (and tear down) an incoming stream of messages from a
+ * given address are hidden.
+ */
+class MessageSource
+{
+ public:
+ virtual ~MessageSource() {}
+ virtual void subscribe(qpid::client::AsyncSession& session, const std::string& destination) = 0;
+ virtual void cancel(qpid::client::AsyncSession& session, const std::string& destination) = 0;
+
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReceiverImpl.h"
+#include "MessageSource.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/MessageListener.h"
+#include "qpid/messaging/Receiver.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Receiver;
+
+void ReceiverImpl::received(qpid::messaging::Message&)
+{
+ //TODO: should this be configurable
+ if (capacity && --window <= capacity/2) {
+ session.sendCompletion();
+ window = capacity;
+ }
+}
+
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ return parent.get(*this, message, timeout);
+}
+
+qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
+ return result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ if (capacity == 0 && !cancelled) {
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+ }
+
+ if (get(message, timeout)) {
+ return true;
+ } else {
+ if (!cancelled) {
+ sync(session).messageFlush(destination);
+ start();//reallocate credit
+ }
+ return get(message, 0);
+ }
+}
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
+ return result;
+}
+
+void ReceiverImpl::cancel()
+{
+ if (!cancelled) {
+ //TODO: should syncronicity be an optional argument to this call?
+ source->cancel(session, destination);
+ //need to be sure cancel is complete and all incoming
+ //framesets are processed before removing the receiver
+ parent.receiverCancelled(destination);
+ cancelled = true;
+ }
+}
+
+void ReceiverImpl::start()
+{
+ if (!cancelled) {
+ started = true;
+ session.messageSetFlowMode(destination, capacity > 0);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+ window = capacity;
+ }
+}
+
+void ReceiverImpl::stop()
+{
+ session.messageStop(destination);
+ started = false;
+}
+
+void ReceiverImpl::subscribe()
+{
+ source->subscribe(session, destination);
+}
+
+void ReceiverImpl::setSession(qpid::client::AsyncSession s)
+{
+ session = s;
+ if (!cancelled) {
+ subscribe();
+ //if we were in started state before the session was changed,
+ //start again on this new session
+ //TODO: locking if receiver is to be threadsafe...
+ if (started) start();
+ }
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+ if (c != capacity) {
+ capacity = c;
+ if (!cancelled && started) {
+ stop();
+ start();
+ }
+ }
+}
+
+void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
+qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
+
+const std::string& ReceiverImpl::getName() const { return destination; }
+
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) :
+ parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF),
+ capacity(0), started(false), cancelled(false), listener(0), window(0) {}
+
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,76 @@
+#ifndef QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H
+#define QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/sys/Time.h"
+#include <memory>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class MessageSource;
+class SessionImpl;
+
+/**
+ * A receiver implementation based on an AMQP 0-10 subscription.
+ */
+class ReceiverImpl : public qpid::messaging::ReceiverImpl
+{
+ public:
+
+ ReceiverImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSource> source);
+
+ bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ qpid::messaging::Message get(qpid::sys::Duration timeout);
+ bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ qpid::messaging::Message fetch(qpid::sys::Duration timeout);
+ void cancel();
+ void start();
+ void stop();
+ void subscribe();
+ void setSession(qpid::client::AsyncSession s);
+ const std::string& getName() const;
+ void setCapacity(uint32_t);
+ void setListener(qpid::messaging::MessageListener* listener);
+ qpid::messaging::MessageListener* getListener();
+ void received(qpid::messaging::Message& message);
+ private:
+ SessionImpl& parent;
+ const std::auto_ptr<MessageSource> source;
+ const std::string destination;
+ const uint32_t byteCredit;
+
+ uint32_t capacity;
+ qpid::client::AsyncSession session;
+ bool started;
+ bool cancelled;
+ qpid::messaging::MessageListener* listener;
+ uint32_t window;
+};
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SenderImpl.h"
+#include "MessageSink.h"
+#include "SessionImpl.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) :
+ parent(_parent), name(_name), sink(_sink) {}
+
+void SenderImpl::send(qpid::messaging::Message& m)
+{
+ sink->send(session, name, m);
+}
+
+void SenderImpl::cancel()
+{
+ sink->cancel(session, name);
+ parent.senderCancelled(name);
+}
+
+void SenderImpl::setSession(qpid::client::AsyncSession s)
+{
+ session = s;
+ sink->declare(session, name);
+}
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,58 @@
+#ifndef QPID_CLIENT_AMQP0_10_SENDERIMPL_H
+#define QPID_CLIENT_AMQP0_10_SENDERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/SenderImpl.h"
+#include "qpid/client/AsyncSession.h"
+#include <memory>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class MessageSink;
+class SessionImpl;
+
+/**
+ *
+ */
+class SenderImpl : public qpid::messaging::SenderImpl
+{
+ public:
+ SenderImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSink> sink);
+ void send(qpid::messaging::Message&);
+ void cancel();
+ void setSession(qpid::client::AsyncSession);
+
+ private:
+ SessionImpl& parent;
+ const std::string name;
+ std::auto_ptr<MessageSink> sink;
+
+ qpid::client::AsyncSession session;
+ std::string destination;
+ std::string routingKey;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_SENDERIMPL_H*/
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org