You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/09/07 20:09:01 UTC

svn commit: r812243 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/ src/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Author: gsim
Date: Mon Sep  7 18:09:00 2009
New Revision: 812243

URL: http://svn.apache.org/viewvc?rev=812243&view=rev
Log:
QPID-664: Added automatic message replay on reconnection.


Added:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
Modified:
    qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h Mon Sep  7 18:09:00 2009
@@ -47,7 +47,7 @@
     QPID_CLIENT_EXTERN ~Sender();
     QPID_CLIENT_EXTERN Sender& operator=(const Sender&);
 
-    QPID_CLIENT_EXTERN void send(Message& message);
+    QPID_CLIENT_EXTERN void send(const Message& message);
     QPID_CLIENT_EXTERN void cancel();
   private:
   friend class qpid::client::PrivateImplRef<Sender>;

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Mon Sep  7 18:09:00 2009
@@ -540,6 +540,7 @@
      qpid/client/amqp0_10/AddressResolution.h
      qpid/client/amqp0_10/AddressResolution.cpp
      qpid/client/amqp0_10/Codecs.cpp
+     qpid/client/amqp0_10/CodecsInternal.h
      qpid/client/amqp0_10/CompletionTracker.h
      qpid/client/amqp0_10/CompletionTracker.cpp
      qpid/client/amqp0_10/ConnectionImpl.h
@@ -548,6 +549,8 @@
      qpid/client/amqp0_10/IncomingMessages.cpp
      qpid/client/amqp0_10/MessageSink.h
      qpid/client/amqp0_10/MessageSource.h
+     qpid/client/amqp0_10/OutgoingMessage.h
+     qpid/client/amqp0_10/OutgoingMessage.cpp
      qpid/client/amqp0_10/ReceiverImpl.h
      qpid/client/amqp0_10/ReceiverImpl.cpp
      qpid/client/amqp0_10/SessionImpl.h

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep  7 18:09:00 2009
@@ -700,6 +700,7 @@
   qpid/client/amqp0_10/AddressResolution.h	\
   qpid/client/amqp0_10/AddressResolution.cpp	\
   qpid/client/amqp0_10/Codecs.cpp		\
+  qpid/client/amqp0_10/CodecsInternal.h		\
   qpid/client/amqp0_10/ConnectionImpl.h	        \
   qpid/client/amqp0_10/ConnectionImpl.cpp	\
   qpid/client/amqp0_10/CompletionTracker.h	\
@@ -708,6 +709,8 @@
   qpid/client/amqp0_10/IncomingMessages.cpp	\
   qpid/client/amqp0_10/MessageSink.h		\
   qpid/client/amqp0_10/MessageSource.h		\
+  qpid/client/amqp0_10/OutgoingMessage.h	\
+  qpid/client/amqp0_10/OutgoingMessage.cpp	\
   qpid/client/amqp0_10/ReceiverImpl.h		\
   qpid/client/amqp0_10/ReceiverImpl.cpp		\
   qpid/client/amqp0_10/SessionImpl.h		\

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Sep  7 18:09:00 2009
@@ -22,6 +22,7 @@
 #include "qpid/client/amqp0_10/Codecs.h"
 #include "qpid/client/amqp0_10/MessageSource.h"
 #include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Filter.h"
 #include "qpid/messaging/Message.h"
@@ -122,7 +123,7 @@
              bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, 
              const FieldTable& options = EMPTY_FIELD_TABLE);
     void declare(qpid::client::AsyncSession& session, const std::string& name);
-    void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+    void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
     void cancel(qpid::client::AsyncSession& session, const std::string& name);
   private:
     const std::string name;
@@ -139,7 +140,7 @@
     QueueSink(const std::string& name, bool passive=true, bool exclusive=false, 
               bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
     void declare(qpid::client::AsyncSession& session, const std::string& name);
-    void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+    void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
     void cancel(qpid::client::AsyncSession& session, const std::string& name);
   private:
     const std::string name;
@@ -328,14 +329,12 @@
     }
 }
 
-void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
 {
-    qpid::client::Message message;
-    convert(m, message);
-    if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
-        message.getDeliveryProperties().setRoutingKey(defaultSubject);
+    if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
+        m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
     }
-    session.messageTransfer(arg::destination=name, arg::content=message);
+    m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
 }
 
 void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
@@ -355,12 +354,10 @@
                                    arg::autoDelete=autoDelete, arg::arguments=options);
     }
 }
-void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
 {
-    qpid::client::Message message;
-    convert(m, message);
-    message.getDeliveryProperties().setRoutingKey(name);
-    session.messageTransfer(arg::content=message);
+    m.message.getDeliveryProperties().setRoutingKey(name);
+    m.status = session.messageTransfer(arg::content=m.message);
 }
 
 void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h?rev=812243&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h Mon Sep  7 18:09:00 2009
@@ -0,0 +1,41 @@
+#ifndef QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H
+#define QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Variant.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Declarations of a couple of conversion functions implemented in
+ * Codecs.cpp but not exposed through API
+ */
+
+void translate(const qpid::messaging::Variant::Map& from, qpid::framing::FieldTable& to);
+void translate(const qpid::framing::FieldTable& from, qpid::messaging::Variant::Variant::Map& to);
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Sep  7 18:09:00 2009
@@ -21,6 +21,7 @@
 #include "qpid/client/amqp0_10/IncomingMessages.h"
 #include "qpid/client/amqp0_10/AddressResolution.h"
 #include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
 #include "qpid/client/SessionImpl.h"
 #include "qpid/client/SessionBase_0_10Access.h"
 #include "qpid/log/Statement.h"
@@ -195,8 +196,6 @@
     parent.retrieve(content, message);
 }
 
-void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp
-
 void populateHeaders(qpid::messaging::Message& message, 
                      const DeliveryProperties* deliveryProperties, 
                      const MessageProperties* messageProperties)

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Mon Sep  7 18:09:00 2009
@@ -33,6 +33,8 @@
 namespace client {
 namespace amqp0_10 {
 
+class OutgoingMessage;
+
 /**
  *
  */
@@ -41,7 +43,7 @@
   public:
     virtual ~MessageSink() {}
     virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0;
-    virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0;
+    virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0;
     virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0;
   private:
 };

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=812243&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Mon Sep  7 18:09:00 2009
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Address;
+using qpid::messaging::MessageImplAccess;
+
+template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to)
+{
+    T codec;
+    MessageImplAccess::get(from).getEncodedContent(codec, to.getData());
+    to.getMessageProperties().setContentType(T::contentType);
+}
+
+void OutgoingMessage::convert(const qpid::messaging::Message& from)
+{
+    //TODO: need to avoid copying as much as possible
+    if (from.getContent().isList()) {
+        encode<ListCodec>(from, message);
+    } else if (from.getContent().isMap()) {
+        encode<MapCodec>(from, message);
+    } else {
+        message.setData(from.getBytes());
+        message.getMessageProperties().setContentType(from.getContentType());
+    }
+    const Address& address = from.getReplyTo();
+    if (!address.value.empty()) {
+        message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
+    }
+    translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());
+    //TODO: set other message properties
+    message.getDeliveryProperties().setRoutingKey(from.getSubject());
+    //TODO: set other delivery properties
+}
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h?rev=812243&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h Mon Sep  7 18:09:00 2009
@@ -0,0 +1,46 @@
+#ifndef QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H
+#define QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/Completion.h"
+#include "qpid/client/Message.h"
+
+namespace qpid {
+namespace messaging {
+class Message;
+}
+namespace client {
+namespace amqp0_10 {
+
+struct OutgoingMessage
+{
+    qpid::client::Message message;
+    qpid::client::Completion status;
+
+    void convert(const qpid::messaging::Message&);
+};
+
+
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Sep  7 18:09:00 2009
@@ -22,6 +22,7 @@
 #include "MessageSink.h"
 #include "SessionImpl.h"
 #include "AddressResolution.h"
+#include "OutgoingMessage.h"
 
 namespace qpid {
 namespace client {
@@ -30,9 +31,10 @@
 SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, 
                        const qpid::messaging::Address& _address, 
                        const qpid::messaging::Variant::Map& _options) : 
-    parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED) {}
+    parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
+    capacity(50), window(0) {}
 
-void SenderImpl::send(qpid::messaging::Message& m) 
+void SenderImpl::send(const qpid::messaging::Message& m) 
 {
     execute1<Send>(&m);
 }
@@ -54,14 +56,36 @@
         parent.senderCancelled(name);
     } else {
         sink->declare(session, name);
-        //TODO: replay
+        replay();
     }
 }
 
-void SenderImpl::sendImpl(qpid::messaging::Message& m) 
+void SenderImpl::sendImpl(const qpid::messaging::Message& m) 
 {
-    //TODO: record for replay if appropriate
-    sink->send(session, name, m);
+    //TODO: make recoding for replay optional
+    std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
+    msg->convert(m);
+    outgoing.push_back(msg.release());
+    sink->send(session, name, outgoing.back());
+    if (++window > (capacity / 2)) {//TODO: make this configurable?
+        session.flush();
+        checkPendingSends();
+        window = 0;
+    }
+}
+
+void SenderImpl::replay()
+{
+    for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+        sink->send(session, name, *i);
+    }
+}
+
+void SenderImpl::checkPendingSends()
+{
+    while (!outgoing.empty() && outgoing.front().status.isComplete()) {
+        outgoing.pop_front();
+    }
 }
 
 void SenderImpl::cancelImpl()

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Mon Sep  7 18:09:00 2009
@@ -28,6 +28,7 @@
 #include "qpid/client/AsyncSession.h"
 #include "qpid/client/amqp0_10/SessionImpl.h"
 #include <memory>
+#include <boost/ptr_container/ptr_deque.hpp>
 
 namespace qpid {
 namespace client {
@@ -35,6 +36,7 @@
 
 class AddressResolution;
 class MessageSink;
+class OutgoingMessage;
 
 /**
  *
@@ -47,7 +49,7 @@
     SenderImpl(SessionImpl& parent, const std::string& name, 
                const qpid::messaging::Address& address, 
                const qpid::messaging::Variant::Map& options);
-    void send(qpid::messaging::Message&);
+    void send(const qpid::messaging::Message&);
     void cancel();
     void init(qpid::client::AsyncSession, AddressResolution&);
 
@@ -63,8 +65,16 @@
     std::string destination;
     std::string routingKey;
 
+    typedef boost::ptr_deque<OutgoingMessage> OutgoingMessages;
+    OutgoingMessages outgoing;
+    uint32_t capacity;
+    uint32_t window;
+
+    void checkPendingSends();
+    void replay();
+
     //logic for application visible methods:
-    void sendImpl(qpid::messaging::Message&);
+    void sendImpl(const qpid::messaging::Message&);
     void cancelImpl();
 
     //functors for application visible methods (allowing locking and
@@ -78,9 +88,9 @@
 
     struct Send : Command
     {
-        qpid::messaging::Message* message;
+        const qpid::messaging::Message* message;
 
-        Send(SenderImpl& i, qpid::messaging::Message* m) : Command(i), message(m) {}
+        Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
         void operator()() { impl.sendImpl(*message); }
     };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp Mon Sep  7 18:09:00 2009
@@ -126,6 +126,15 @@
     }
 }
 
+void MessageImpl::getEncodedContent(Codec& codec, std::string& out) const
+{
+    if (content.getType() != VAR_VOID) {
+        codec.encode(content, out);
+    } else {
+        out = bytes;
+    }
+}
+
 void MessageImpl::decode(Codec& codec) 
 {
     codec.decode(bytes, content);    
@@ -188,5 +197,9 @@
 {
     return *msg.impl;
 }
+const MessageImpl& MessageImplAccess::get(const Message& msg)
+{
+    return *msg.impl;
+}
 
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h Mon Sep  7 18:09:00 2009
@@ -84,6 +84,7 @@
 
     void clear();
 
+    void getEncodedContent(Codec& codec, std::string&) const;
     void encode(Codec& codec);
     void decode(Codec& codec);
 
@@ -125,6 +126,7 @@
 struct MessageImplAccess
 {
     static MessageImpl& get(Message&);
+    static const MessageImpl& get(const Message&);
 };
 
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp Mon Sep  7 18:09:00 2009
@@ -38,7 +38,7 @@
 Sender::Sender(const Sender& s) : qpid::client::Handle<SenderImpl>() { PI::copy(*this, s); }
 Sender::~Sender() { PI::dtor(*this); }
 Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
-void Sender::send(Message& message) { impl->send(message); }
+void Sender::send(const Message& message) { impl->send(message); }
 void Sender::cancel() { impl->cancel(); }
 
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h Mon Sep  7 18:09:00 2009
@@ -35,7 +35,7 @@
 {
   public:
     virtual ~SenderImpl() {}
-    virtual void send(Message& message) = 0;
+    virtual void send(const Message& message) = 0;
     virtual void cancel() = 0;
   private:
 };

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Sep  7 18:09:00 2009
@@ -310,7 +310,6 @@
     sender.send(out);
     Receiver receiver = fix.session.createReceiver(fix.queue);
     Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);    
-    BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
     BOOST_CHECK_EQUAL(in.getContent().asMap()["abc"].asString(), "def");
     BOOST_CHECK_EQUAL(in.getContent().asMap()["pi"].asFloat(), 3.14f);
     fix.session.acknowledge();
@@ -329,7 +328,6 @@
     sender.send(out);
     Receiver receiver = fix.session.createReceiver(fix.queue);
     Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);    
-    BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
     Variant::List& list = in.getContent().asList();    
     BOOST_CHECK_EQUAL(list.size(), out.getContent().asList().size());
     BOOST_CHECK_EQUAL(list.front().asString(), "abc");



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org