You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/08/25 19:57:37 UTC

svn commit: r807731 [3/3] - in /qpid/trunk/qpid/cpp: ./ examples/ examples/messaging/ include/qpid/client/amqp0_10/ include/qpid/messaging/ src/ src/qpid/client/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/SessionImpl.h"
+#include "qpid/client/amqp0_10/ReceiverImpl.h"
+#include "qpid/client/amqp0_10/SenderImpl.h"
+#include "qpid/client/amqp0_10/MessageSource.h"
+#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/PrivateImplRef.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageListener.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/framing/reply_exceptions.h"
+#include <boost/format.hpp>
+#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+using qpid::messaging::Filter;
+using qpid::messaging::Sender;
+using qpid::messaging::Receiver;
+using qpid::messaging::VariantMap;
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+SessionImpl::SessionImpl(qpid::client::Session s) : session(s), incoming(session) {}
+
+
+void SessionImpl::commit()
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    incoming.accept();
+    session.txCommit();
+}
+
+void SessionImpl::rollback()
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop();
+    //ensure that stop has been processed and all previously sent
+    //messages are available for release:                   
+    session.sync();
+    incoming.releaseAll();
+    session.txRollback();    
+    for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start();
+}
+
+void SessionImpl::acknowledge()
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    incoming.accept();
+}
+
+void SessionImpl::reject(qpid::messaging::Message& m)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    //TODO: how do I get the id of the original transfer command? think this through some more...
+    SequenceNumber id(reinterpret_cast<uint32_t>(m.getInternalId()));
+    SequenceSet set;
+    set.add(id);
+    session.messageReject(set);
+}
+
+void SessionImpl::close()
+{
+    session.close();
+}
+
+void translate(const VariantMap& options, SubscriptionSettings& settings)
+{
+    //TODO: fill this out
+    VariantMap::const_iterator i = options.find("auto_acknowledge");
+    if (i != options.end()) {
+        settings.autoAck = i->second.asInt32();
+    }
+}
+
+template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
+{
+    return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t));
+}
+
+template <class T> void getFreeKey(std::string& key, T& map)
+{
+    std::string name = key;
+    int count = 1;
+    for (typename T::const_iterator i = map.find(name); i != map.end(); i = map.find(name)) {
+        name = (boost::format("%1%_%2%") % key % ++count).str();
+    }
+    key = name;
+}
+
+Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options)
+{ 
+    qpid::sys::Mutex::ScopedLock l(lock);
+    std::auto_ptr<MessageSink> sink = resolver.resolveSink(session, address, options);
+    std::string name = address;
+    getFreeKey(name, senders);
+    Sender sender(new SenderImpl(*this, name, sink));
+    getImplPtr<Sender, SenderImpl>(sender)->setSession(session);
+    senders[name] = sender;
+    return sender;
+}
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options)
+{ 
+    return addReceiver(address, 0, options);
+}
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const Filter& filter, const VariantMap& options)
+{ 
+    return addReceiver(address, &filter, options);
+}
+
+Receiver SessionImpl::addReceiver(const qpid::messaging::Address& address, const Filter* filter, const VariantMap& options)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    std::auto_ptr<MessageSource> source = resolver.resolveSource(session, address, filter, options);
+    std::string name = address;
+    getFreeKey(name, receivers);
+    Receiver receiver(new ReceiverImpl(*this, name, source));
+    getImplPtr<Receiver, ReceiverImpl>(receiver)->setSession(session);
+    receivers[name] = receiver;
+    return receiver;
+}
+
+qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName)
+{
+    std::string name = baseName + std::string("_") + session.getId().getName();
+    session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true);
+    return qpid::messaging::Address(name);
+}
+
+SessionImpl& SessionImpl::convert(qpid::messaging::Session& s)
+{
+    boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s);
+    if (!impl) {
+        throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl"));
+    }
+    return *impl;
+}
+
+namespace {
+
+struct IncomingMessageHandler : IncomingMessages::Handler
+{
+    typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback;
+    Callback callback;
+
+    IncomingMessageHandler(Callback c) : callback(c) {}
+
+    bool accept(IncomingMessages::MessageTransfer& transfer)
+    {
+        return callback(transfer);
+    }
+};
+
+}
+
+bool SessionImpl::accept(ReceiverImpl* receiver, 
+                         qpid::messaging::Message* message, 
+                         bool isDispatch, 
+                         IncomingMessages::MessageTransfer& transfer)
+{
+    if (receiver->getName() == transfer.getDestination()) {
+        transfer.retrieve(message);
+        if (isDispatch) {
+            qpid::sys::Mutex::ScopedUnlock u(lock);
+            qpid::messaging::MessageListener* listener = receiver->getListener();
+            if (listener) listener->received(*message);
+        }
+        receiver->received(*message);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch, IncomingMessages::MessageTransfer& transfer)
+{
+    Receivers::iterator i = receivers.find(transfer.getDestination());
+    if (i == receivers.end()) {
+        QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
+        return false;
+    } else {
+        boost::intrusive_ptr<ReceiverImpl> receiver = getImplPtr<Receiver, ReceiverImpl>(i->second);
+        return receiver && (!isDispatch || receiver->getListener()) && accept(receiver.get(), message, isDispatch, transfer);
+    }
+}
+
+bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    return incoming.get(handler, timeout);
+}
+
+bool SessionImpl::dispatch(qpid::sys::Duration timeout)
+{
+    qpid::messaging::Message message;
+    IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1));
+    return getIncoming(handler, timeout);
+}
+
+bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+    IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1));
+    return getIncoming(handler, timeout);
+}
+
+bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+    IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1));
+    return getIncoming(handler, timeout);
+}
+
+qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout) 
+{
+    qpid::messaging::Message result;
+    if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
+    return result;
+}
+
+void SessionImpl::receiverCancelled(const std::string& name)
+{
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        receivers.erase(name);
+    }
+    session.sync();
+    incoming.releasePending(name);
+}
+
+void SessionImpl::senderCancelled(const std::string& name)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    senders.erase(name);
+}
+
+void SessionImpl::sync()
+{
+    session.sync();
+}
+
+void SessionImpl::flush()
+{
+    session.flush();
+}
+
+void* SessionImpl::getLastConfirmedSent()
+{
+    return 0;
+}
+
+void* SessionImpl::getLastConfirmedAcknowledged()
+{
+    return 0;
+}
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,107 @@
+#ifndef QPID_CLIENT_AMQP0_10_SESSIONIMPL_H
+#define QPID_CLIENT_AMQP0_10_SESSIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/SessionImpl.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/IncomingMessages.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+
+namespace messaging {
+class Address;
+class Filter;
+class Message;
+class Receiver;
+class Sender;
+class Session;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+class ReceiverImpl;
+class SenderImpl;
+
+/**
+ * Implementation of the protocol independent Session interface using
+ * AMQP 0-10.
+ */
+class SessionImpl : public qpid::messaging::SessionImpl
+{
+  public:
+    SessionImpl(qpid::client::Session);
+    void commit();
+    void rollback();
+    void acknowledge();
+    void reject(qpid::messaging::Message&);
+    void close();
+    void sync();
+    void flush();
+    qpid::messaging::Address createTempQueue(const std::string& baseName);
+    qpid::messaging::Sender createSender(const qpid::messaging::Address& address,
+                                         const qpid::messaging::VariantMap& options);
+    qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address,
+                                             const qpid::messaging::VariantMap& options);
+    qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address, 
+                                             const qpid::messaging::Filter& filter,
+                                             const qpid::messaging::VariantMap& options);
+
+    void* getLastConfirmedSent();
+    void* getLastConfirmedAcknowledged();
+
+    bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+    qpid::messaging::Message fetch(qpid::sys::Duration timeout);
+    bool dispatch(qpid::sys::Duration timeout);
+
+    bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout);    
+
+    void receiverCancelled(const std::string& name);
+    void senderCancelled(const std::string& name);
+    
+    static SessionImpl& convert(qpid::messaging::Session&);
+
+    qpid::client::Session session;
+  private:
+    typedef std::map<std::string, qpid::messaging::Receiver> Receivers;
+    typedef std::map<std::string, qpid::messaging::Sender> Senders;
+
+    qpid::sys::Mutex lock;
+    AddressResolution resolver;
+    IncomingMessages incoming;
+    Receivers receivers;
+    Senders senders;
+
+    qpid::messaging::Receiver addReceiver(const qpid::messaging::Address& address, 
+                                          const qpid::messaging::Filter* filter, 
+                                          const qpid::messaging::VariantMap& options);
+    bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
+    bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
+    bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_SESSIONIMPL_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Address.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+Address::Address() {}
+Address::Address(const std::string& address) : value(address) {}
+Address::Address(const std::string& address, const std::string& t) : value(address), type(t) {}
+Address::operator const std::string&() const { return value; }
+const std::string& Address::toStr() const { return value; }
+Address::operator bool() const { return !value.empty(); }
+bool Address::operator !() const { return value.empty(); }
+
+const std::string TYPE_SEPARATOR(":");
+
+std::ostream& operator<<(std::ostream& out, const Address& address)
+{
+    if (!address.type.empty()) {
+        out << address.type;
+        out << TYPE_SEPARATOR;
+    }
+    out << address.value;
+    return out;
+}
+
+}} // namespace qpid::messaging

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/ConnectionImpl.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/SessionImpl.h"
+#include "qpid/client/PrivateImplRef.h"
+#include "qpid/client/amqp0_10/ConnectionImpl.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace client {
+
+typedef PrivateImplRef<qpid::messaging::Connection> PI;
+
+}
+
+namespace messaging {
+
+using qpid::client::PI;
+
+Connection Connection::open(const std::string& url, const Variant::Map& options)
+{
+    //only support amqp 0-10 at present
+    Connection connection(new qpid::client::amqp0_10::ConnectionImpl(url, options));
+    return connection;
+}
+
+Connection::Connection(ConnectionImpl* impl) { PI::ctor(*this, impl); }
+Connection::Connection(const Connection& c) : qpid::client::Handle<ConnectionImpl>() { PI::copy(*this, c); }
+Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); }
+Connection::~Connection() { PI::dtor(*this); }
+
+void Connection::close() { impl->close(); }
+Session Connection::newSession() { return impl->newSession(); }
+
+InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
+
+void parseKeyValuePair(const std::string& in, Variant::Map& out)
+{
+    std::string::size_type i = in.find('=');
+    if (i == std::string::npos || i == in.size() || in.find('=', i+1) != std::string::npos) {
+        throw InvalidOptionString(QPID_MSG("Cannot parse name-value pair from " << in));
+    } else {
+        out[in.substr(0, i)] = in.substr(i+1);
+    }
+}
+
+void parseOptionString(const std::string& in, Variant::Map& out)
+{
+    std::string::size_type start = 0;
+    std::string::size_type i = in.find('&');
+    while (i != std::string::npos) {
+        parseKeyValuePair(in.substr(start, i-start), out);
+        if (i < in.size()) {
+            start = i+1;
+            i = in.find('&', start);
+        } else {
+            i = std::string::npos;
+        }
+    }
+    parseKeyValuePair(in.substr(start), out);
+}
+
+Variant::Map parseOptionString(const std::string& in)
+{
+    Variant::Map map;    
+    parseOptionString(in, map);
+    return map;
+}
+
+}} // namespace qpid::messaging

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,45 @@
+#ifndef QPID_MESSAGING_CONNECTIONIMPL_H
+#define QPID_MESSAGING_CONNECTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+class Session;
+
+class ConnectionImpl : public virtual qpid::RefCounted
+{
+  public:
+    virtual ~ConnectionImpl() {}
+    virtual void close() = 0;
+    virtual Session newSession() = 0;
+  private:
+};
+}} // namespace qpid::messaging
+
+#endif  /*!QPID_MESSAGING_CONNECTIONIMPL_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/Filter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Filter.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Filter.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Filter.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Filter.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+Filter::Filter(std::string t, std::string pattern) : type(t) { patterns.push_back(pattern); }
+Filter::Filter(std::string t, std::string pattern1, std::string pattern2) : type(t) 
+{ 
+    patterns.push_back(pattern1); 
+    patterns.push_back(pattern2); 
+}
+
+const std::string Filter::WILDCARD("WILDCARD");
+const std::string Filter::EXACT_MATCH("EXACT_MATCH");
+
+}} // namespace qpid::messaging

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Message.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,325 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Codec.h"
+#include "qpid/messaging/MessageContent.h"
+#include "qpid/messaging/Variant.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+namespace {
+const std::string EMPTY_STRING = "";
+}
+
+struct MessageImpl : MessageContent
+{
+    Address replyTo;
+    std::string subject;
+    std::string contentType;
+    VariantMap headers;
+
+    std::string bytes;
+    Variant content;//used only for LIST and MAP
+    VariantType type;//if LIST, MAP content holds the value; if VOID bytes holds the value
+
+    void* internalId;
+
+    MessageImpl(const std::string& c);
+    MessageImpl(const char* chars, size_t count);
+
+    void setReplyTo(const Address& d);
+    const Address& getReplyTo() const;
+    
+    void setSubject(const std::string& s);
+    const std::string& getSubject() const;
+    
+    void setContentType(const std::string& s);
+    const std::string& getContentType() const;
+    
+    const VariantMap& getHeaders() const;
+    VariantMap& getHeaders();
+    
+    void setBytes(const std::string& bytes);
+    void setBytes(const char* chars, size_t count);
+    const std::string& getBytes() const;
+    std::string& getBytes();
+
+    void setInternalId(void*);
+    void* getInternalId();
+
+    bool isVoid() const;
+
+    const std::string& asString() const;
+    std::string& asString();
+
+    const char* asChars() const;
+    size_t size() const;
+
+    const Variant::Map& asMap() const;
+    Variant::Map& asMap();
+    bool isMap() const;
+
+    const Variant::List& asList() const;
+    Variant::List& asList();
+    bool isList() const;
+
+    void clear();
+
+    void encode(Codec& codec);
+    void decode(Codec& codec);
+
+    Variant& operator[](const std::string&);
+
+    std::ostream& print(std::ostream& out) const;
+
+    //operator<< for variety of types...
+    MessageContent& operator<<(const std::string&);
+    MessageContent& operator<<(const char*);
+    MessageContent& operator<<(bool);
+    MessageContent& operator<<(int8_t);
+    MessageContent& operator<<(int16_t);
+    MessageContent& operator<<(int32_t);
+    MessageContent& operator<<(int64_t);
+    MessageContent& operator<<(uint8_t);
+    MessageContent& operator<<(uint16_t);
+    MessageContent& operator<<(uint32_t);
+    MessageContent& operator<<(uint64_t);
+    MessageContent& operator<<(double);
+    MessageContent& operator<<(float);
+
+    //assignment from string, map and list
+    MessageContent& operator=(const std::string&);
+    MessageContent& operator=(const char*);
+    MessageContent& operator=(const Variant::Map&);
+    MessageContent& operator=(const Variant::List&);
+
+    template <class T> MessageContent& append(T& t);
+};
+
+MessageImpl::MessageImpl(const std::string& c) : bytes(c), type(VOID), internalId(0) {}
+MessageImpl::MessageImpl(const char* chars, size_t count) : bytes(chars, count), type(VOID), internalId(0) {}
+
+void MessageImpl::setReplyTo(const Address& d) { replyTo = d; }
+const Address& MessageImpl::getReplyTo() const { return replyTo; }
+
+void MessageImpl::setSubject(const std::string& s) { subject = s; }
+const std::string& MessageImpl::getSubject() const { return subject; }
+
+void MessageImpl::setContentType(const std::string& s) { contentType = s; }
+const std::string& MessageImpl::getContentType() const { return contentType; }
+
+const VariantMap& MessageImpl::getHeaders() const { return headers; }
+VariantMap& MessageImpl::getHeaders() { return headers; }
+
+//should these methods be on MessageContent?
+void MessageImpl::setBytes(const std::string& c) { clear(); bytes = c; }
+void MessageImpl::setBytes(const char* chars, size_t count) { clear(); bytes.assign(chars, count); }
+const std::string& MessageImpl::getBytes() const { return bytes; }
+std::string& MessageImpl::getBytes() { return bytes; }
+
+
+Variant& MessageImpl::operator[](const std::string& key) { return asMap()[key]; }
+
+std::ostream& MessageImpl::print(std::ostream& out) const
+{
+    if (type == MAP) {
+        return out << content.asMap();
+    } else if (type == LIST) {
+        return out << content.asList();
+    } else {
+        return out << bytes;
+    }
+}
+
+template <class T> MessageContent& MessageImpl::append(T& t)
+{
+    if (type == VOID) {
+        //TODO: this is inefficient, probably want to hold on to the stream object
+        std::stringstream s;
+        s << bytes;
+        s << t;
+        bytes = s.str();
+    } else if (type == LIST) {
+        content.asList().push_back(Variant(t));
+    } else {
+        throw InvalidConversion("<< operator only valid on strings and lists");
+    }
+    return *this;
+}
+
+MessageContent& MessageImpl::operator<<(const std::string& v) { return append(v); }
+MessageContent& MessageImpl::operator<<(const char* v) { return append(v); }
+MessageContent& MessageImpl::operator<<(bool v) { return append(v); }
+MessageContent& MessageImpl::operator<<(int8_t v) { return append(v); }
+MessageContent& MessageImpl::operator<<(int16_t v) { return append(v); }
+MessageContent& MessageImpl::operator<<(int32_t v) { return append(v); }
+MessageContent& MessageImpl::operator<<(int64_t v) { return append(v); }
+MessageContent& MessageImpl::operator<<(uint8_t v) { return append(v); }
+MessageContent& MessageImpl::operator<<(uint16_t v) { return append(v); }
+MessageContent& MessageImpl::operator<<(uint32_t v) { return append(v); }
+MessageContent& MessageImpl::operator<<(uint64_t v) { return append(v); }
+MessageContent& MessageImpl::operator<<(double v) { return append(v); }
+MessageContent& MessageImpl::operator<<(float v) { return append(v); }
+MessageContent& MessageImpl::operator=(const std::string& s) 
+{ 
+    type = VOID;
+    bytes = s; 
+    return *this;
+}
+MessageContent& MessageImpl::operator=(const char* c) 
+{ 
+    type = VOID;
+    bytes = c;
+    return *this;
+}
+MessageContent& MessageImpl::operator=(const Variant::Map& m)
+{ 
+    type = MAP;
+    content = m; 
+    return *this;
+}
+
+MessageContent& MessageImpl::operator=(const Variant::List& l)
+{ 
+    type = LIST;
+    content = l; 
+    return *this;
+}
+
+void MessageImpl::encode(Codec& codec)
+{
+    if (content.getType() != VOID) {
+        bytes = EMPTY_STRING;
+        codec.encode(content, bytes);
+    }
+}
+
+void MessageImpl::decode(Codec& codec) 
+{
+    codec.decode(bytes, content);    
+    if (content.getType() == MAP) type = MAP;
+    else if (content.getType() == LIST) type = LIST;
+    else type = VOID;//TODO: what if codec set some type other than map or list??
+}
+
+void MessageImpl::setInternalId(void* i) { internalId = i; }
+void* MessageImpl::getInternalId() { return internalId; }
+
+bool MessageImpl::isVoid() const { return type == VOID; }
+
+const std::string& MessageImpl::asString() const 
+{ 
+    if (isVoid()) return getBytes();
+    else return content.getString();//will throw an error
+}
+std::string& MessageImpl::asString()
+{ 
+    if (isVoid()) return getBytes();
+    else return content.getString();//will throw an error
+}
+
+const char* MessageImpl::asChars() const
+{
+    if (!isVoid()) throw InvalidConversion("Content is of structured type.");
+    return bytes.data();
+}
+size_t MessageImpl::size() const
+{
+    return bytes.size();
+}
+
+const Variant::Map& MessageImpl::asMap() const { return content.asMap(); }
+Variant::Map& MessageImpl::asMap()
+{ 
+    if (isVoid()) {
+        content = Variant::Map(); 
+        type = MAP; 
+    }
+    return content.asMap(); 
+}
+bool MessageImpl::isMap() const { return type == MAP; }
+
+const Variant::List& MessageImpl::asList() const { return content.asList(); }
+Variant::List& MessageImpl::asList()
+{ 
+    if (isVoid()) {
+        content = Variant::List(); 
+        type = LIST; 
+    }
+    return content.asList(); 
+}
+bool MessageImpl::isList() const { return type == LIST; }
+
+void MessageImpl::clear() { bytes = EMPTY_STRING; content.reset(); type = VOID; } 
+
+
+Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {}
+Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {}
+
+Message::Message(const Message& m) : impl(new MessageImpl(m.getBytes())) {}
+Message::~Message() { delete impl; }
+
+Message& Message::operator=(const Message& m) { *impl = *m.impl; return *this; }
+
+void Message::setReplyTo(const Address& d) { impl->setReplyTo(d); }
+const Address& Message::getReplyTo() const { return impl->getReplyTo(); }
+
+void Message::setSubject(const std::string& s) { impl->setSubject(s); }
+const std::string& Message::getSubject() const { return impl->getSubject(); }
+
+void Message::setContentType(const std::string& s) { impl->setContentType(s); }
+const std::string& Message::getContentType() const { return impl->getContentType(); }
+
+const VariantMap& Message::getHeaders() const { return impl->getHeaders(); }
+VariantMap& Message::getHeaders() { return impl->getHeaders(); }
+
+void Message::setBytes(const std::string& c) { impl->setBytes(c); }
+void Message::setBytes(const char* chars, size_t count) { impl->setBytes(chars, count); }
+const std::string& Message::getBytes() const { return impl->getBytes(); }
+std::string& Message::getBytes() { return impl->getBytes(); }
+
+const char* Message::getRawContent() const { return impl->getBytes().data(); }
+size_t Message::getContentSize() const { return impl->getBytes().size(); }
+
+MessageContent& Message::getContent() { return *impl; }
+const MessageContent& Message::getContent() const { return *impl; }
+void Message::setContent(const std::string& s) { *impl = s; }
+void Message::setContent(const Variant::Map& m) { *impl = m; }
+void Message::setContent(const Variant::List& l) { *impl = l; }
+
+void Message::encode(Codec& codec) { impl->encode(codec); }
+
+void Message::decode(Codec& codec) { impl->decode(codec); }
+
+void Message::setInternalId(void* i) { impl->setInternalId(i); }
+void* Message::getInternalId() { return impl->getInternalId(); }
+
+std::ostream& operator<<(std::ostream& out, const MessageContent& content)
+{
+    return content.print(out);
+}
+
+}} // namespace qpid::messaging

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/client/PrivateImplRef.h"
+
+namespace qpid {
+namespace client {
+
+typedef PrivateImplRef<qpid::messaging::Receiver> PI;
+
+}
+
+namespace messaging {
+
+using qpid::client::PI;
+
+Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); }
+Receiver::Receiver(const Receiver& s) : qpid::client::Handle<ReceiverImpl>() { PI::copy(*this, s); }
+Receiver::~Receiver() { PI::dtor(*this); }
+Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); }
+bool Receiver::get(Message& message, qpid::sys::Duration timeout) { return impl->get(message, timeout); }
+Message Receiver::get(qpid::sys::Duration timeout) { return impl->get(timeout); }
+bool Receiver::fetch(Message& message, qpid::sys::Duration timeout) { return impl->fetch(message, timeout); }
+Message Receiver::fetch(qpid::sys::Duration timeout) { return impl->fetch(timeout); }
+void Receiver::start() { impl->start(); }
+void Receiver::stop() { impl->stop(); }
+void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
+void Receiver::cancel() { impl->cancel(); }
+void Receiver::setListener(MessageListener* listener) { impl->setListener(listener); }
+
+}} // namespace qpid::messaging

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,52 @@
+#ifndef QPID_MESSAGING_RECEIVERIMPL_H
+#define QPID_MESSAGING_RECEIVERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/RefCounted.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+class Message;
+class MessageListener;
+
+class ReceiverImpl : public virtual qpid::RefCounted
+{
+  public:
+    virtual ~ReceiverImpl() {}
+    virtual bool get(Message& message, qpid::sys::Duration timeout) = 0;
+    virtual Message get(qpid::sys::Duration timeout) = 0;
+    virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0;
+    virtual Message fetch(qpid::sys::Duration timeout) = 0;
+    virtual void start() = 0;
+    virtual void stop() = 0;
+    virtual void setCapacity(uint32_t) = 0;
+    virtual void cancel() = 0;
+    virtual void setListener(MessageListener*) = 0;
+};
+}} // namespace qpid::messaging
+
+#endif  /*!QPID_MESSAGING_RECEIVERIMPL_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/SenderImpl.h"
+#include "qpid/client/PrivateImplRef.h"
+
+namespace qpid {
+namespace client {
+
+typedef PrivateImplRef<qpid::messaging::Sender> PI;
+
+}
+
+namespace messaging {
+
+using qpid::client::PI;
+
+Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); }
+Sender::Sender(const Sender& s) : qpid::client::Handle<SenderImpl>() { PI::copy(*this, s); }
+Sender::~Sender() { PI::dtor(*this); }
+Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
+void Sender::send(Message& message) { impl->send(message); }
+void Sender::cancel() { impl->cancel(); }
+
+}} // namespace qpid::messaging

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,44 @@
+#ifndef QPID_MESSAGING_SENDERIMPL_H
+#define QPID_MESSAGING_SENDERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+class Message;
+
+class SenderImpl : public virtual qpid::RefCounted
+{
+  public:
+    virtual ~SenderImpl() {}
+    virtual void send(Message& message) = 0;
+    virtual void cancel() = 0;
+  private:
+};
+}} // namespace qpid::messaging
+
+#endif  /*!QPID_MESSAGING_SENDERIMPL_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/SessionImpl.h"
+#include "qpid/client/PrivateImplRef.h"
+
+namespace qpid {
+namespace client {
+
+typedef PrivateImplRef<qpid::messaging::Session> PI;
+
+}
+
+namespace messaging {
+
+using qpid::client::PI;
+
+Session::Session(SessionImpl* impl) { PI::ctor(*this, impl); }
+Session::Session(const Session& s) : qpid::client::Handle<SessionImpl>() { PI::copy(*this, s); }
+Session::~Session() { PI::dtor(*this); }
+Session& Session::operator=(const Session& s) { return PI::assign(*this, s); }
+void Session::commit() { impl->commit(); }
+void Session::rollback() { impl->rollback(); }
+void Session::acknowledge() { impl->acknowledge(); }
+void Session::reject(Message& m) { impl->reject(m); }
+void Session::close() { impl->close(); }
+
+Sender Session::createSender(const Address& address, const VariantMap& options)
+{
+    return impl->createSender(address, options);
+}
+Receiver Session::createReceiver(const Address& address, const VariantMap& options)
+{
+    return impl->createReceiver(address, options);
+}
+Receiver Session::createReceiver(const Address& address, const Filter& filter, const VariantMap& options)
+{ 
+    return impl->createReceiver(address, filter, options);
+}
+
+Sender Session::createSender(const std::string& address, const VariantMap& options)
+{ 
+    return impl->createSender(Address(address), options); 
+}
+Receiver Session::createReceiver(const std::string& address, const VariantMap& options)
+{ 
+    return impl->createReceiver(Address(address), options); 
+}
+Receiver Session::createReceiver(const std::string& address, const Filter& filter, const VariantMap& options)
+{ 
+    return impl->createReceiver(Address(address), filter, options); 
+}
+
+Address Session::createTempQueue(const std::string& baseName)
+{ 
+    return impl->createTempQueue(baseName); 
+}
+
+void Session::sync()
+{
+    impl->sync();
+}
+
+void Session::flush()
+{
+    impl->flush();
+}
+
+bool Session::fetch(Message& message, qpid::sys::Duration timeout)
+{
+    return impl->fetch(message, timeout);
+}
+
+Message Session::fetch(qpid::sys::Duration timeout)
+{
+    return impl->fetch(timeout);
+}
+
+bool Session::dispatch(qpid::sys::Duration timeout)
+{
+    return impl->dispatch(timeout);
+}
+
+void* Session::getLastConfirmedSent()
+{
+    return impl->getLastConfirmedSent();
+}
+
+void* Session::getLastConfirmedAcknowledged()
+{ 
+    return impl->getLastConfirmedAcknowledged();
+}
+
+}} // namespace qpid::messaging

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h Tue Aug 25 17:57:34 2009
@@ -0,0 +1,65 @@
+#ifndef QPID_MESSAGING_SESSIONIMPL_H
+#define QPID_MESSAGING_SESSIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/RefCounted.h"
+#include <string>
+#include "qpid/messaging/Variant.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+class Address;
+class Filter;
+class Message;
+class Sender;
+class Receiver;
+
+class SessionImpl : public virtual qpid::RefCounted
+{
+  public:
+    virtual ~SessionImpl() {}
+    virtual void commit() = 0;
+    virtual void rollback() = 0;
+    virtual void acknowledge() = 0;
+    virtual void reject(Message&) = 0;
+    virtual void close() = 0;
+    virtual void sync() = 0;
+    virtual void flush() = 0;
+    virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0;
+    virtual Message fetch(qpid::sys::Duration timeout) = 0;
+    virtual bool dispatch(qpid::sys::Duration timeout) = 0;
+    virtual Address createTempQueue(const std::string& baseName) = 0;
+    virtual Sender createSender(const Address& address, const VariantMap& options) = 0;
+    virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0;
+    virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0;
+    virtual void* getLastConfirmedSent() = 0;
+    virtual void* getLastConfirmedAcknowledged() = 0;
+  private:
+};
+}} // namespace qpid::messaging
+
+#endif  /*!QPID_MESSAGING_SESSIONIMPL_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Variant.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,603 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Variant.h"
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace client {
+}
+
+namespace messaging {
+
+InvalidConversion::InvalidConversion(const std::string& msg) : Exception(msg) {}
+
+
+namespace {
+std::string EMPTY;
+}
+
+class VariantImpl
+{
+  public:
+    VariantImpl();
+    VariantImpl(bool);
+    VariantImpl(uint8_t);
+    VariantImpl(uint16_t);
+    VariantImpl(uint32_t);
+    VariantImpl(uint64_t);
+    VariantImpl(int8_t);
+    VariantImpl(int16_t);
+    VariantImpl(int32_t);
+    VariantImpl(int64_t);
+    VariantImpl(float);
+    VariantImpl(double);
+    VariantImpl(const std::string&);
+    VariantImpl(const Variant::Map&);
+    VariantImpl(const Variant::List&);
+    ~VariantImpl();
+
+    VariantType getType() const;
+
+    bool asBool() const;
+    uint8_t asUint8() const;
+    uint16_t asUint16() const;
+    uint32_t asUint32() const;
+    uint64_t asUint64() const;
+    int8_t asInt8() const;
+    int16_t asInt16() const;
+    int32_t asInt32() const;
+    int64_t asInt64() const;
+    float asFloat() const;
+    double asDouble() const;
+    std::string asString() const;
+
+    const Variant::Map& asMap() const;
+    Variant::Map& asMap();
+    const Variant::List& asList() const;
+    Variant::List& asList();
+
+    const std::string& getString() const;
+    std::string& getString();
+
+    void setEncoding(const std::string&);
+    const std::string& getEncoding() const;
+
+    static VariantImpl* create(const Variant&);    
+  private:
+    const VariantType type;
+    union {
+        bool b;
+        uint8_t ui8;
+        uint16_t ui16;
+        uint32_t ui32;
+        uint64_t ui64;
+        int8_t i8;
+        int16_t i16;
+        int32_t i32;
+        int64_t i64;
+        float f;
+        double d;
+        void* v;//variable width data
+    } value;
+    std::string encoding;//optional encoding for variable length data
+
+    std::string getTypeName(VariantType type) const;
+    template<class T> T convertFromString() const
+    {
+        std::string* s = reinterpret_cast<std::string*>(value.v);
+        try {
+            return boost::lexical_cast<T>(*s);
+        } catch(const boost::bad_lexical_cast&) {
+            throw InvalidConversion(QPID_MSG("Cannot convert " << *s));
+        }
+    }
+};
+
+
+VariantImpl::VariantImpl() : type(VOID) { value.i64 = 0; }
+VariantImpl::VariantImpl(bool b) : type(BOOL) { value.b = b; }
+VariantImpl::VariantImpl(uint8_t i) : type(UINT8) { value.ui8 = i; }
+VariantImpl::VariantImpl(uint16_t i) : type(UINT16) { value.ui16 = i; }
+VariantImpl::VariantImpl(uint32_t i) : type(UINT32) { value.ui32 = i; }
+VariantImpl::VariantImpl(uint64_t i) : type(UINT64) { value.ui64 = i; }
+VariantImpl::VariantImpl(int8_t i) : type(INT8) { value.i8 = i; }
+VariantImpl::VariantImpl(int16_t i) : type(INT16) { value.i16 = i; }
+VariantImpl::VariantImpl(int32_t i) : type(INT32) { value.i32 = i; }
+VariantImpl::VariantImpl(int64_t i) : type(INT64) { value.i64 = i; }
+VariantImpl::VariantImpl(float f) : type(FLOAT) { value.f = f; }
+VariantImpl::VariantImpl(double d) : type(DOUBLE) { value.d = d; }
+VariantImpl::VariantImpl(const std::string& s) : type(STRING) { value.v = new std::string(s); }
+VariantImpl::VariantImpl(const Variant::Map& m) : type(MAP) { value.v = new Variant::Map(m); }
+VariantImpl::VariantImpl(const Variant::List& l) : type(LIST) { value.v = new Variant::List(l); }
+
+VariantImpl::~VariantImpl() { 
+    switch (type) {
+      case STRING:
+        delete reinterpret_cast<std::string*>(value.v);
+        break;
+      case MAP:
+        delete reinterpret_cast<Variant::Map*>(value.v);
+        break;
+      case LIST:
+        delete reinterpret_cast<Variant::List*>(value.v);
+        break;
+      default:
+        break;
+    }
+}
+
+VariantType VariantImpl::getType() const { return type; }
+
+namespace {
+
+bool same_char(char a, char b) 
+{
+    return toupper(a) == toupper(b);
+}
+
+bool caseInsensitiveMatch(const std::string& a, const std::string& b)
+{
+    return a.size() == b.size() && std::equal(a.begin(), a.end(), b.begin(), &same_char);
+}
+
+const std::string TRUE("True");
+const std::string FALSE("False");
+
+bool toBool(const std::string& s)
+{
+    if (caseInsensitiveMatch(s, TRUE)) return true;
+    if (caseInsensitiveMatch(s, FALSE)) return false;
+    try { return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) {}
+    throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool"));    
+}
+
+}
+
+bool VariantImpl::asBool() const
+{
+    switch(type) {
+      case VOID: return false;
+      case BOOL: return value.b;
+      case UINT8: return value.ui8;
+      case UINT16: return value.ui16;
+      case UINT32: return value.ui32;
+      case UINT64: return value.ui64;
+      case INT8: return value.i8;
+      case INT16: return value.i16;
+      case INT32: return value.i32;
+      case INT64: return value.i64;
+      case STRING: return toBool(*reinterpret_cast<std::string*>(value.v));
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(BOOL)));
+    }
+}
+uint8_t VariantImpl::asUint8() const
+{
+    switch(type) {
+      case UINT8: return value.ui8;
+      case STRING: return convertFromString<uint8_t>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(UINT8)));
+    }
+}
+uint16_t VariantImpl::asUint16() const
+{
+    switch(type) {
+      case UINT8: return value.ui8;
+      case UINT16: return value.ui16;
+      case STRING: return convertFromString<uint16_t>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(UINT16)));
+    }
+}
+uint32_t VariantImpl::asUint32() const
+{
+    switch(type) {
+      case UINT8: return value.ui8;
+      case UINT16: return value.ui16;
+      case UINT32: return value.ui32;
+      case STRING: return convertFromString<uint32_t>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(UINT32)));
+    }
+}
+uint64_t VariantImpl::asUint64() const
+{
+    switch(type) {
+      case UINT8: return value.ui8;
+      case UINT16: return value.ui16;
+      case UINT32: return value.ui32;
+      case UINT64: return value.ui64;
+      case STRING: return convertFromString<uint64_t>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(UINT64)));
+    }
+}
+int8_t VariantImpl::asInt8() const
+{
+    switch(type) {
+      case INT8: return value.i8;
+      case STRING: return convertFromString<int8_t>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(INT8)));
+    }
+}
+int16_t VariantImpl::asInt16() const
+{
+    switch(type) {
+      case INT8: return value.i8;
+      case INT16: return value.i16;
+      case STRING: return convertFromString<int16_t>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(INT16)));
+    }
+}
+int32_t VariantImpl::asInt32() const
+{
+    switch(type) {
+      case INT8: return value.i8;
+      case INT16: return value.i16;
+      case INT32: return value.i32;
+      case STRING: return convertFromString<int32_t>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(INT32)));
+    }
+}
+int64_t VariantImpl::asInt64() const
+{
+    switch(type) {
+      case INT8: return value.i8;
+      case INT16: return value.i16;
+      case INT32: return value.i32;
+      case INT64: return value.i64;
+      case STRING: return convertFromString<int64_t>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(INT64)));
+    }
+}
+float VariantImpl::asFloat() const
+{
+    switch(type) {
+      case FLOAT: return value.f;
+      case STRING: return convertFromString<float>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(FLOAT)));
+    }
+}
+double VariantImpl::asDouble() const
+{
+    switch(type) {
+      case FLOAT: return value.f;
+      case DOUBLE: return value.d;
+      case STRING: return convertFromString<double>();
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(DOUBLE)));
+    }
+}
+std::string VariantImpl::asString() const
+{
+    switch(type) {
+      case VOID: return EMPTY;
+      case BOOL: return value.b ? TRUE : FALSE;
+      case UINT8: return boost::lexical_cast<std::string>((int) value.ui8);
+      case UINT16: return boost::lexical_cast<std::string>(value.ui16);
+      case UINT32: return boost::lexical_cast<std::string>(value.ui32);
+      case UINT64: return boost::lexical_cast<std::string>(value.ui64);
+      case INT8: return boost::lexical_cast<std::string>((int) value.i8);
+      case INT16: return boost::lexical_cast<std::string>(value.i16);
+      case INT32: return boost::lexical_cast<std::string>(value.i32);
+      case INT64: return boost::lexical_cast<std::string>(value.i64);
+      case DOUBLE: return boost::lexical_cast<std::string>(value.d);
+      case FLOAT: return boost::lexical_cast<std::string>(value.f);
+      case STRING: return *reinterpret_cast<std::string*>(value.v);
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(STRING)));
+    }
+}
+
+const Variant::Map& VariantImpl::asMap() const
+{
+    switch(type) {
+      case MAP: return *reinterpret_cast<Variant::Map*>(value.v);
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(MAP)));
+    }
+}
+
+Variant::Map& VariantImpl::asMap()
+{
+    switch(type) {
+      case MAP: return *reinterpret_cast<Variant::Map*>(value.v);
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(MAP)));
+    }
+}
+
+const Variant::List& VariantImpl::asList() const
+{
+    switch(type) {
+      case LIST: return *reinterpret_cast<Variant::List*>(value.v);
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(LIST)));
+    }
+}
+
+Variant::List& VariantImpl::asList()
+{
+    switch(type) {
+      case LIST: return *reinterpret_cast<Variant::List*>(value.v);
+      default: throw InvalidConversion(QPID_MSG("Cannot convert from " << getTypeName(type) << " to " << getTypeName(LIST)));
+    }
+}
+
+std::string& VariantImpl::getString()
+{
+    switch(type) {
+      case STRING: return *reinterpret_cast<std::string*>(value.v);
+      default: throw InvalidConversion(QPID_MSG("Variant is not a string; use asString() if conversion is required."));
+    }
+}
+
+const std::string& VariantImpl::getString() const
+{
+    switch(type) {
+      case STRING: return *reinterpret_cast<std::string*>(value.v);
+      default: throw InvalidConversion(QPID_MSG("Variant is not a string; use asString() if conversion is required."));
+    }
+}
+
+void VariantImpl::setEncoding(const std::string& s) { encoding = s; }
+const std::string& VariantImpl::getEncoding() const { return encoding; }
+
+std::string VariantImpl::getTypeName(VariantType type) const
+{
+    switch (type) {
+      case VOID: return "void";
+      case BOOL: return "bool";
+      case UINT8: return "uint8";
+      case UINT16: return "uint16";
+      case UINT32: return "uint32";
+      case UINT64: return "uint64";
+      case INT8: return "int8";
+      case INT16: return "int16";
+      case INT32: return "int32";
+      case INT64: return "int64";
+      case FLOAT: return "float";
+      case DOUBLE: return "double";
+      case STRING: return "string";
+      case MAP: return "map";
+      case LIST: return "list";
+    }
+    return "<unknown>";//should never happen
+}
+
+VariantImpl* VariantImpl::create(const Variant& v) 
+{
+    switch (v.getType()) {
+      case BOOL: return new VariantImpl(v.asBool());
+      case UINT8: return new VariantImpl(v.asUint8());
+      case UINT16: return new VariantImpl(v.asUint16());
+      case UINT32: return new VariantImpl(v.asUint32());
+      case UINT64: return new VariantImpl(v.asUint64());
+      case INT8: return new VariantImpl(v.asInt8());
+      case INT16: return new VariantImpl(v.asInt16());
+      case INT32: return new VariantImpl(v.asInt32());
+      case INT64: return new VariantImpl(v.asInt64());
+      case FLOAT: return new VariantImpl(v.asFloat());
+      case DOUBLE: return new VariantImpl(v.asDouble());
+      case STRING: return new VariantImpl(v.asString());
+      case MAP: return new VariantImpl(v.asMap());
+      case LIST: return new VariantImpl(v.asList());
+      default: return new VariantImpl();
+    }
+}
+
+Variant::Variant() : impl(new VariantImpl()) {}
+Variant::Variant(bool b) : impl(new VariantImpl(b)) {}
+Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {}
+Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {}
+Variant::Variant(uint32_t i) : impl(new VariantImpl(i)) {}
+Variant::Variant(uint64_t i) : impl(new VariantImpl(i)) {}
+Variant::Variant(int8_t i) : impl(new VariantImpl(i)) {}
+Variant::Variant(int16_t i) : impl(new VariantImpl(i)) {}
+Variant::Variant(int32_t i) : impl(new VariantImpl(i)) {}
+Variant::Variant(int64_t i) : impl(new VariantImpl(i)) {}
+Variant::Variant(float f) : impl(new VariantImpl(f)) {}
+Variant::Variant(double d) : impl(new VariantImpl(d)) {}
+Variant::Variant(const std::string& s) : impl(new VariantImpl(s)) {}
+Variant::Variant(const char* s) : impl(new VariantImpl(std::string(s))) {}
+Variant::Variant(const Map& m) : impl(new VariantImpl(m)) {}
+Variant::Variant(const List& l) : impl(new VariantImpl(l)) {}
+Variant::Variant(const Variant& v) : impl(VariantImpl::create(v)) {}
+
+Variant::~Variant() { if (impl) delete impl; }
+
+void Variant::reset()
+{
+    if (impl) delete impl;
+    impl = new VariantImpl();
+}
+
+
+Variant& Variant::operator=(bool b)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(b);
+    return *this;
+}
+
+Variant& Variant::operator=(uint8_t i)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(i);
+    return *this;
+}
+Variant& Variant::operator=(uint16_t i)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(i);
+    return *this;
+}
+Variant& Variant::operator=(uint32_t i)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(i);
+    return *this;
+}
+Variant& Variant::operator=(uint64_t i)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(i);
+    return *this;
+}
+
+Variant& Variant::operator=(int8_t i)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(i);
+    return *this;
+}
+Variant& Variant::operator=(int16_t i)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(i);
+    return *this;
+}
+Variant& Variant::operator=(int32_t i)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(i);
+    return *this;
+}
+Variant& Variant::operator=(int64_t i)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(i);
+    return *this;
+}
+
+Variant& Variant::operator=(float f)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(f);
+    return *this;
+}
+Variant& Variant::operator=(double d)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(d);
+    return *this;
+}
+
+Variant& Variant::operator=(const std::string& s)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(s);
+    return *this;
+}
+
+Variant& Variant::operator=(const char* s)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(std::string(s));
+    return *this;
+}
+
+Variant& Variant::operator=(const Map& m)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(m);
+    return *this;
+}
+
+Variant& Variant::operator=(const List& l)
+{
+    if (impl) delete impl;
+    impl = new VariantImpl(l);
+    return *this;
+}
+
+Variant& Variant::operator=(const Variant& v)
+{
+    if (impl) delete impl;
+    impl = VariantImpl::create(v);
+    return *this;
+}
+
+VariantType Variant::getType() const { return impl->getType(); }
+bool Variant::asBool() const { return impl->asBool(); }
+uint8_t Variant::asUint8() const { return impl->asUint8(); }
+uint16_t Variant::asUint16() const { return impl->asUint16(); }
+uint32_t Variant::asUint32() const { return impl->asUint32(); }
+uint64_t Variant::asUint64() const { return impl->asUint64(); }
+int8_t Variant::asInt8() const { return impl->asInt8(); }
+int16_t Variant::asInt16() const { return impl->asInt16(); }
+int32_t Variant::asInt32() const { return impl->asInt32(); }
+int64_t Variant::asInt64() const { return impl->asInt64(); }
+float Variant::asFloat() const { return impl->asFloat(); }
+double Variant::asDouble() const { return impl->asDouble(); }
+std::string Variant::asString() const { return impl->asString(); }
+const Variant::Map& Variant::asMap() const { return impl->asMap(); }
+Variant::Map& Variant::asMap() { return impl->asMap(); }
+const Variant::List& Variant::asList() const { return impl->asList(); }
+Variant::List& Variant::asList() { return impl->asList(); }
+const std::string& Variant::getString() const { return impl->getString(); }
+std::string& Variant::getString() { return impl->getString(); }
+void Variant::setEncoding(const std::string& s) { impl->setEncoding(s); }
+const std::string& Variant::getEncoding() const { return impl->getEncoding(); }
+
+Variant::operator bool() const { return asBool(); }
+Variant::operator uint8_t() const { return asUint8(); }
+Variant::operator uint16_t() const { return asUint16(); }
+Variant::operator uint32_t() const { return asUint32(); }
+Variant::operator uint64_t() const { return asUint64(); }
+Variant::operator int8_t() const { return asInt8(); }
+Variant::operator int16_t() const { return asInt16(); }
+Variant::operator int32_t() const { return asInt32(); }
+Variant::operator int64_t() const { return asInt64(); }
+Variant::operator float() const { return asFloat(); }
+Variant::operator double() const { return asDouble(); }
+Variant::operator const char*() const { return asString().c_str(); }
+
+std::ostream& operator<<(std::ostream& out, const Variant::Map& map)
+{
+    for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+        if (i != map.begin()) out << ", ";
+        out << i->first << ":" << i->second;
+    }
+    return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const Variant::List& list)
+{
+    for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+        if (i != list.begin()) out << ", ";
+        out << *i;
+    }
+    return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const Variant& value)
+{
+    switch (value.getType()) {
+      case MAP:
+        out << "{" << value.asMap() << "}";
+        break;
+      case LIST:
+        out << "[" << value.asList() << "]";
+        break;
+      case VOID:
+        out << "<void>";
+        break;
+      default:
+        out << value.asString();
+        break;
+    }
+    return out;    
+}
+
+}} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Tue Aug 25 17:57:34 2009
@@ -95,6 +95,7 @@
     InlineAllocator
     InlineVector
     ClientSessionTest
+    MessagingSessionTest
     SequenceSet
     StringUtils
     IncompleteMessageList
@@ -128,6 +129,7 @@
     ReplicationTest
     ClientMessageTest
     PollableCondition
+    Variant
     ${xml_tests}
     CACHE STRING "Which unit tests to build"
    )

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=807731&r1=807730&r2=807731&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Aug 25 17:57:34 2009
@@ -65,6 +65,7 @@
 	$(lib_client) $(lib_broker) $(lib_console)
 
 unit_test_SOURCES= unit_test.cpp unit_test.h \
+	MessagingSessionTests.cpp \
 	ClientSessionTest.cpp \
 	BrokerFixture.h SocketProxy.h \
 	exception_test.cpp \
@@ -111,7 +112,8 @@
 	FrameDecoder.cpp \
 	ReplicationTest.cpp \
 	ClientMessageTest.cpp \
-	PollableCondition.cpp
+	PollableCondition.cpp \
+	Variant.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp

Added: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,325 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "unit_test.h"
+#include "test_tools.h"
+#include "BrokerFixture.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageListener.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Time.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+#include <string>
+#include <vector>
+
+QPID_AUTO_TEST_SUITE(MessagingSessionTests)
+
+using namespace qpid::messaging;
+using namespace qpid;
+using qpid::broker::Broker;
+
+struct BrokerAdmin
+{
+    qpid::client::Connection connection;
+    qpid::client::Session session;
+
+    BrokerAdmin(uint16_t port)
+    {
+        connection.open("localhost", port);
+        session = connection.newSession();
+    }
+
+    void createQueue(const std::string& name)
+    {
+        session.queueDeclare(qpid::client::arg::queue=name);
+    }
+
+    void deleteQueue(const std::string& name)
+    {
+        session.queueDelete(qpid::client::arg::queue=name);
+    }
+
+    void createExchange(const std::string& name, const std::string& type)
+    {
+        session.exchangeDeclare(qpid::client::arg::exchange=name, qpid::client::arg::type=type);
+    }
+
+    void deleteExchange(const std::string& name)
+    {
+        session.exchangeDelete(qpid::client::arg::exchange=name);
+    }
+
+    ~BrokerAdmin()
+    {
+        session.close();
+        connection.close();
+    }
+};
+
+struct MessagingFixture : public BrokerFixture
+{
+    Connection connection;
+    Session session;
+    BrokerAdmin admin;
+
+    MessagingFixture(Broker::Options opts = Broker::Options()) : 
+        BrokerFixture(opts),
+        connection(Connection::open((boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(Broker::TCP_TRANSPORT))).str())),
+        session(connection.newSession()),
+        admin(broker->getPort(Broker::TCP_TRANSPORT)) {}
+
+    ~MessagingFixture()
+    {
+        session.close();
+        connection.close();
+    }
+};
+
+struct QueueFixture : MessagingFixture
+{
+    std::string queue;
+
+    QueueFixture(const std::string& name = "test-queue") : queue(name)
+    {
+        admin.createQueue(queue);
+    }
+
+    ~QueueFixture()
+    {
+        admin.deleteQueue(queue);
+    }
+
+};
+
+struct TopicFixture : MessagingFixture
+{
+    std::string topic;
+
+    TopicFixture(const std::string& name = "test-topic", const std::string& type="fanout") : topic(name)
+    {
+        admin.createExchange(topic, type);
+    }
+
+    ~TopicFixture()
+    {
+        admin.deleteExchange(topic);
+    }
+
+};
+
+struct MultiQueueFixture : MessagingFixture
+{
+    typedef std::vector<std::string>::const_iterator const_iterator; 
+    std::vector<std::string> queues;
+
+    MultiQueueFixture(const std::vector<std::string>& names = boost::assign::list_of<std::string>("q1")("q2")("q3")) : queues(names)
+    {
+        for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
+            admin.createQueue(*i);
+        }
+    }
+
+    ~MultiQueueFixture()
+    {
+        for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
+            admin.deleteQueue(*i);
+        }
+    }
+
+};
+
+struct MessageDataCollector : MessageListener
+{
+    std::vector<std::string> messageData;
+
+    void received(Message& message) {
+        messageData.push_back(message.getBytes());
+    }
+};
+
+std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5) 
+{
+    std::vector<std::string> data;
+    Message message;
+    for (int i = 0; i < count && receiver.fetch(message, timeout); i++) {
+        data.push_back(message.getBytes());
+    }
+    return data;
+}
+
+QPID_AUTO_TEST_CASE(testSimpleSendReceive)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Message out("test-message");
+    sender.send(out);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+    fix.session.acknowledge();
+    BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
+}
+
+QPID_AUTO_TEST_CASE(testSenderError)
+{
+    MessagingFixture fix;
+    //TODO: this is the wrong type for the exception; define explicit set in messaging namespace
+    BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::framing::NotFoundException);
+}
+
+QPID_AUTO_TEST_CASE(testReceiverError)
+{
+    MessagingFixture fix;
+    //TODO: this is the wrong type for the exception; define explicit set in messaging namespace
+    BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::framing::NotFoundException);
+}
+
+QPID_AUTO_TEST_CASE(testSimpleTopic)
+{
+    TopicFixture fix;
+
+    Sender sender = fix.session.createSender(fix.topic);
+    Message msg("one");
+    sender.send(msg);
+    Receiver sub1 = fix.session.createReceiver(fix.topic);
+    sub1.setCapacity(10u);
+    sub1.start();
+    msg.setBytes("two");
+    sender.send(msg);
+    Receiver sub2 = fix.session.createReceiver(fix.topic);
+    sub2.setCapacity(10u);
+    sub2.start();
+    msg.setBytes("three");
+    sender.send(msg);
+    Receiver sub3 = fix.session.createReceiver(fix.topic);
+    sub3.setCapacity(10u);
+    sub3.start();
+    msg.setBytes("four");
+    sender.send(msg);
+    BOOST_CHECK_EQUAL(fetch(sub2, 2), boost::assign::list_of<std::string>("three")("four"));
+    sub2.cancel();
+
+    msg.setBytes("five");
+    sender.send(msg);
+    BOOST_CHECK_EQUAL(fetch(sub1, 4), boost::assign::list_of<std::string>("two")("three")("four")("five"));
+    BOOST_CHECK_EQUAL(fetch(sub3, 2), boost::assign::list_of<std::string>("four")("five"));
+    Message in;
+    BOOST_CHECK(!sub2.fetch(in, 0));//TODO: or should this raise an error?
+
+    
+    //TODO: check pending messages...
+}
+
+QPID_AUTO_TEST_CASE(testSessionFetch)
+{
+    MultiQueueFixture fix;
+    
+    for (uint i = 0; i < fix.queues.size(); i++) {
+        Receiver r = fix.session.createReceiver(fix.queues[i]);
+        r.setCapacity(10u);
+        r.start();//TODO: add Session::start
+    }
+
+    for (uint i = 0; i < fix.queues.size(); i++) {
+        Sender s = fix.session.createSender(fix.queues[i]);
+        Message msg((boost::format("Message_%1%") % (i+1)).str());
+        s.send(msg);
+    }    
+    
+    for (uint i = 0; i < fix.queues.size(); i++) {
+        Message msg;
+        BOOST_CHECK(fix.session.fetch(msg, qpid::sys::TIME_SEC));
+        BOOST_CHECK_EQUAL(msg.getBytes(), (boost::format("Message_%1%") % (i+1)).str());
+    }
+}
+
+QPID_AUTO_TEST_CASE(testSessionDispatch)
+{
+    MultiQueueFixture fix;
+    
+    MessageDataCollector collector;
+    for (uint i = 0; i < fix.queues.size(); i++) {
+        Receiver r = fix.session.createReceiver(fix.queues[i]);
+        r.setListener(&collector);
+        r.setCapacity(10u);
+        r.start();//TODO: add Session::start
+    }
+
+    for (uint i = 0; i < fix.queues.size(); i++) {
+        Sender s = fix.session.createSender(fix.queues[i]);
+        Message msg((boost::format("Message_%1%") % (i+1)).str());
+        s.send(msg);
+    }    
+
+    while (fix.session.dispatch(qpid::sys::TIME_SEC));
+    
+    BOOST_CHECK_EQUAL(collector.messageData, boost::assign::list_of<std::string>("Message_1")("Message_2")("Message_3"));
+}
+
+
+QPID_AUTO_TEST_CASE(testMapMessage)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Message out;
+    out.getContent().asMap()["abc"] = "def";
+    out.getContent().asMap()["pi"] = 3.14f;
+    sender.send(out);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);    
+    BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
+    BOOST_CHECK_EQUAL(in.getContent().asMap()["abc"].asString(), "def");
+    BOOST_CHECK_EQUAL(in.getContent().asMap()["pi"].asFloat(), 3.14f);
+    fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testListMessage)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Message out;
+    out.getContent() = Variant::List();
+    out.getContent() << "abc";
+    out.getContent() << 1234;
+    out.getContent() << "def";
+    out.getContent() << 56.789;    
+    sender.send(out);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);    
+    BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
+    Variant::List& list = in.getContent().asList();    
+    BOOST_CHECK_EQUAL(list.size(), out.getContent().asList().size());
+    BOOST_CHECK_EQUAL(list.front().asString(), "abc");
+    list.pop_front();
+    BOOST_CHECK_EQUAL(list.front().asInt64(), 1234);
+    list.pop_front();
+    BOOST_CHECK_EQUAL(list.front().asString(), "def");
+    list.pop_front();
+    BOOST_CHECK_EQUAL(list.front().asDouble(), 56.789);
+    fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_SUITE_END()

Added: qpid/trunk/qpid/cpp/src/tests/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Variant.cpp?rev=807731&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Variant.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/Variant.cpp Tue Aug 25 17:57:34 2009
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/messaging/Variant.h"
+
+#include "unit_test.h"
+
+using namespace qpid::messaging;
+
+QPID_AUTO_TEST_SUITE(VariantSuite)
+
+QPID_AUTO_TEST_CASE(testConversions)
+{
+    Variant value;
+
+    //string to float/double
+    value = "1.5";
+    BOOST_CHECK_EQUAL((float) 1.5, value.asFloat());
+    BOOST_CHECK_EQUAL((double) 1.5, value.asDouble());
+
+    //float to string or double
+    value = 1.5f;
+    BOOST_CHECK_EQUAL((float) 1.5, value.asFloat());
+    BOOST_CHECK_EQUAL((double) 1.5, value.asDouble());
+    BOOST_CHECK_EQUAL(std::string("1.5"), value.asString());
+
+    //double to string (conversion to float not valid)
+    value = 1.5;
+    BOOST_CHECK_THROW(value.asFloat(), InvalidConversion);
+    BOOST_CHECK_EQUAL((double) 1.5, value.asDouble());
+    BOOST_CHECK_EQUAL(std::string("1.5"), value.asString());
+
+    //uint8 to larger unsigned ints and string
+    value = (uint8_t) 7;
+    BOOST_CHECK_EQUAL((uint8_t) 7, value.asUint8());
+    BOOST_CHECK_EQUAL((uint16_t) 7, value.asUint16());
+    BOOST_CHECK_EQUAL((uint32_t) 7, value.asUint32());
+    BOOST_CHECK_EQUAL((uint64_t) 7, value.asUint64());
+    BOOST_CHECK_EQUAL(std::string("7"), value.asString());
+    BOOST_CHECK_THROW(value.asInt8(), InvalidConversion);
+
+    value = (uint16_t) 8;
+    BOOST_CHECK_EQUAL(std::string("8"), value.asString());
+    value = (uint32_t) 9;
+    BOOST_CHECK_EQUAL(std::string("9"), value.asString());
+
+    //uint32 to larger unsigned ints and string
+    value = (uint32_t) 9999999;
+    BOOST_CHECK_EQUAL((uint32_t) 9999999, value.asUint32());
+    BOOST_CHECK_EQUAL((uint64_t) 9999999, value.asUint64());
+    BOOST_CHECK_EQUAL(std::string("9999999"), value.asString());
+    BOOST_CHECK_THROW(value.asUint8(), InvalidConversion);
+    BOOST_CHECK_THROW(value.asUint16(), InvalidConversion);
+    BOOST_CHECK_THROW(value.asInt32(), InvalidConversion);
+
+    value = "true";
+    BOOST_CHECK(value.asBool());
+    value = "false";
+    BOOST_CHECK(!value.asBool());
+    value = "1";
+    BOOST_CHECK(value.asBool());
+    value = "0";
+    BOOST_CHECK(!value.asBool());
+    value = "other";
+    BOOST_CHECK_THROW(value.asBool(), InvalidConversion);
+}
+
+QPID_AUTO_TEST_CASE(testAssignment)
+{
+    Variant value("abc");
+    Variant other = value;
+    BOOST_CHECK_EQUAL(STRING, value.getType());
+    BOOST_CHECK_EQUAL(other.getType(), value.getType());
+    BOOST_CHECK_EQUAL(other.asString(), value.asString());
+
+    const uint32_t i(1000);
+    value = i;
+    BOOST_CHECK_EQUAL(UINT32, value.getType());
+    BOOST_CHECK_EQUAL(STRING, other.getType());    
+}
+
+QPID_AUTO_TEST_CASE(testList)
+{    
+    const std::string s("abc");
+    const float f(9.876);
+    const int16_t x(1000);
+
+    Variant value = Variant::List();
+    value.asList().push_back(Variant(s));
+    value.asList().push_back(Variant(f));
+    value.asList().push_back(Variant(x));
+    BOOST_CHECK_EQUAL(3u, value.asList().size());
+    Variant::List::const_iterator i = value.asList().begin(); 
+
+    BOOST_CHECK(i != value.asList().end());
+    BOOST_CHECK_EQUAL(STRING, i->getType());
+    BOOST_CHECK_EQUAL(s, i->asString());
+    i++;
+
+    BOOST_CHECK(i != value.asList().end());
+    BOOST_CHECK_EQUAL(FLOAT, i->getType());
+    BOOST_CHECK_EQUAL(f, i->asFloat());
+    i++;
+
+    BOOST_CHECK(i != value.asList().end());
+    BOOST_CHECK_EQUAL(INT16, i->getType());
+    BOOST_CHECK_EQUAL(x, i->asInt16());
+    i++;
+
+    BOOST_CHECK(i == value.asList().end());
+}
+
+QPID_AUTO_TEST_CASE(testMap)
+{    
+    const std::string red("red");
+    const float pi(3.14);
+    const int16_t x(1000);
+
+    Variant value = Variant::Map();
+    value.asMap()["colour"] = red;
+    value.asMap()["pi"] = pi;
+    value.asMap()["my-key"] = x;
+    BOOST_CHECK_EQUAL(3u, value.asMap().size());
+
+    BOOST_CHECK_EQUAL(STRING, value.asMap()["colour"].getType());
+    BOOST_CHECK_EQUAL(red, value.asMap()["colour"].asString());
+
+    BOOST_CHECK_EQUAL(FLOAT, value.asMap()["pi"].getType());
+    BOOST_CHECK_EQUAL(pi, value.asMap()["pi"].asFloat());
+    
+    BOOST_CHECK_EQUAL(INT16, value.asMap()["my-key"].getType());
+    BOOST_CHECK_EQUAL(x, value.asMap()["my-key"].asInt16());
+
+    value.asMap()["my-key"] = "now it's a string";
+    BOOST_CHECK_EQUAL(STRING, value.asMap()["my-key"].getType());
+    BOOST_CHECK_EQUAL(std::string("now it's a string"), value.asMap()["my-key"].asString());
+}
+ 
+QPID_AUTO_TEST_SUITE_END()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org