You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/09/07 20:09:01 UTC
svn commit: r812243 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/ src/
src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/
Author: gsim
Date: Mon Sep 7 18:09:00 2009
New Revision: 812243
URL: http://svn.apache.org/viewvc?rev=812243&view=rev
Log:
QPID-664: Added automatic message replay on reconnection.
Added:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
Modified:
qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h Mon Sep 7 18:09:00 2009
@@ -47,7 +47,7 @@
QPID_CLIENT_EXTERN ~Sender();
QPID_CLIENT_EXTERN Sender& operator=(const Sender&);
- QPID_CLIENT_EXTERN void send(Message& message);
+ QPID_CLIENT_EXTERN void send(const Message& message);
QPID_CLIENT_EXTERN void cancel();
private:
friend class qpid::client::PrivateImplRef<Sender>;
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Mon Sep 7 18:09:00 2009
@@ -540,6 +540,7 @@
qpid/client/amqp0_10/AddressResolution.h
qpid/client/amqp0_10/AddressResolution.cpp
qpid/client/amqp0_10/Codecs.cpp
+ qpid/client/amqp0_10/CodecsInternal.h
qpid/client/amqp0_10/CompletionTracker.h
qpid/client/amqp0_10/CompletionTracker.cpp
qpid/client/amqp0_10/ConnectionImpl.h
@@ -548,6 +549,8 @@
qpid/client/amqp0_10/IncomingMessages.cpp
qpid/client/amqp0_10/MessageSink.h
qpid/client/amqp0_10/MessageSource.h
+ qpid/client/amqp0_10/OutgoingMessage.h
+ qpid/client/amqp0_10/OutgoingMessage.cpp
qpid/client/amqp0_10/ReceiverImpl.h
qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/client/amqp0_10/SessionImpl.h
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep 7 18:09:00 2009
@@ -700,6 +700,7 @@
qpid/client/amqp0_10/AddressResolution.h \
qpid/client/amqp0_10/AddressResolution.cpp \
qpid/client/amqp0_10/Codecs.cpp \
+ qpid/client/amqp0_10/CodecsInternal.h \
qpid/client/amqp0_10/ConnectionImpl.h \
qpid/client/amqp0_10/ConnectionImpl.cpp \
qpid/client/amqp0_10/CompletionTracker.h \
@@ -708,6 +709,8 @@
qpid/client/amqp0_10/IncomingMessages.cpp \
qpid/client/amqp0_10/MessageSink.h \
qpid/client/amqp0_10/MessageSource.h \
+ qpid/client/amqp0_10/OutgoingMessage.h \
+ qpid/client/amqp0_10/OutgoingMessage.cpp \
qpid/client/amqp0_10/ReceiverImpl.h \
qpid/client/amqp0_10/ReceiverImpl.cpp \
qpid/client/amqp0_10/SessionImpl.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Sep 7 18:09:00 2009
@@ -22,6 +22,7 @@
#include "qpid/client/amqp0_10/Codecs.h"
#include "qpid/client/amqp0_10/MessageSource.h"
#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
@@ -122,7 +123,7 @@
bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false,
const FieldTable& options = EMPTY_FIELD_TABLE);
void declare(qpid::client::AsyncSession& session, const std::string& name);
- void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+ void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
const std::string name;
@@ -139,7 +140,7 @@
QueueSink(const std::string& name, bool passive=true, bool exclusive=false,
bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
void declare(qpid::client::AsyncSession& session, const std::string& name);
- void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+ void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
const std::string name;
@@ -328,14 +329,12 @@
}
}
-void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
- qpid::client::Message message;
- convert(m, message);
- if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
- message.getDeliveryProperties().setRoutingKey(defaultSubject);
+ if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
+ m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
}
- session.messageTransfer(arg::destination=name, arg::content=message);
+ m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
}
void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
@@ -355,12 +354,10 @@
arg::autoDelete=autoDelete, arg::arguments=options);
}
}
-void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
- qpid::client::Message message;
- convert(m, message);
- message.getDeliveryProperties().setRoutingKey(name);
- session.messageTransfer(arg::content=message);
+ m.message.getDeliveryProperties().setRoutingKey(name);
+ m.status = session.messageTransfer(arg::content=m.message);
}
void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h?rev=812243&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h Mon Sep 7 18:09:00 2009
@@ -0,0 +1,41 @@
+#ifndef QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H
+#define QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Variant.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Declarations of a couple of conversion functions implemented in
+ * Codecs.cpp but not exposed through API
+ */
+
+void translate(const qpid::messaging::Variant::Map& from, qpid::framing::FieldTable& to);
+void translate(const qpid::framing::FieldTable& from, qpid::messaging::Variant::Variant::Map& to);
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Sep 7 18:09:00 2009
@@ -21,6 +21,7 @@
#include "qpid/client/amqp0_10/IncomingMessages.h"
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
#include "qpid/client/SessionImpl.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/log/Statement.h"
@@ -195,8 +196,6 @@
parent.retrieve(content, message);
}
-void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp
-
void populateHeaders(qpid::messaging::Message& message,
const DeliveryProperties* deliveryProperties,
const MessageProperties* messageProperties)
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Mon Sep 7 18:09:00 2009
@@ -33,6 +33,8 @@
namespace client {
namespace amqp0_10 {
+class OutgoingMessage;
+
/**
*
*/
@@ -41,7 +43,7 @@
public:
virtual ~MessageSink() {}
virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0;
- virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0;
+ virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0;
virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0;
private:
};
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=812243&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Mon Sep 7 18:09:00 2009
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Address;
+using qpid::messaging::MessageImplAccess;
+
+template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to)
+{
+ T codec;
+ MessageImplAccess::get(from).getEncodedContent(codec, to.getData());
+ to.getMessageProperties().setContentType(T::contentType);
+}
+
+void OutgoingMessage::convert(const qpid::messaging::Message& from)
+{
+ //TODO: need to avoid copying as much as possible
+ if (from.getContent().isList()) {
+ encode<ListCodec>(from, message);
+ } else if (from.getContent().isMap()) {
+ encode<MapCodec>(from, message);
+ } else {
+ message.setData(from.getBytes());
+ message.getMessageProperties().setContentType(from.getContentType());
+ }
+ const Address& address = from.getReplyTo();
+ if (!address.value.empty()) {
+ message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
+ }
+ translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());
+ //TODO: set other message properties
+ message.getDeliveryProperties().setRoutingKey(from.getSubject());
+ //TODO: set other delivery properties
+}
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h?rev=812243&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h Mon Sep 7 18:09:00 2009
@@ -0,0 +1,46 @@
+#ifndef QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H
+#define QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/Completion.h"
+#include "qpid/client/Message.h"
+
+namespace qpid {
+namespace messaging {
+class Message;
+}
+namespace client {
+namespace amqp0_10 {
+
+struct OutgoingMessage
+{
+ qpid::client::Message message;
+ qpid::client::Completion status;
+
+ void convert(const qpid::messaging::Message&);
+};
+
+
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Sep 7 18:09:00 2009
@@ -22,6 +22,7 @@
#include "MessageSink.h"
#include "SessionImpl.h"
#include "AddressResolution.h"
+#include "OutgoingMessage.h"
namespace qpid {
namespace client {
@@ -30,9 +31,10 @@
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
const qpid::messaging::Address& _address,
const qpid::messaging::Variant::Map& _options) :
- parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED) {}
+ parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
+ capacity(50), window(0) {}
-void SenderImpl::send(qpid::messaging::Message& m)
+void SenderImpl::send(const qpid::messaging::Message& m)
{
execute1<Send>(&m);
}
@@ -54,14 +56,36 @@
parent.senderCancelled(name);
} else {
sink->declare(session, name);
- //TODO: replay
+ replay();
}
}
-void SenderImpl::sendImpl(qpid::messaging::Message& m)
+void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
- //TODO: record for replay if appropriate
- sink->send(session, name, m);
+ //TODO: make recoding for replay optional
+ std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
+ msg->convert(m);
+ outgoing.push_back(msg.release());
+ sink->send(session, name, outgoing.back());
+ if (++window > (capacity / 2)) {//TODO: make this configurable?
+ session.flush();
+ checkPendingSends();
+ window = 0;
+ }
+}
+
+void SenderImpl::replay()
+{
+ for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+ sink->send(session, name, *i);
+ }
+}
+
+void SenderImpl::checkPendingSends()
+{
+ while (!outgoing.empty() && outgoing.front().status.isComplete()) {
+ outgoing.pop_front();
+ }
}
void SenderImpl::cancelImpl()
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Mon Sep 7 18:09:00 2009
@@ -28,6 +28,7 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
#include <memory>
+#include <boost/ptr_container/ptr_deque.hpp>
namespace qpid {
namespace client {
@@ -35,6 +36,7 @@
class AddressResolution;
class MessageSink;
+class OutgoingMessage;
/**
*
@@ -47,7 +49,7 @@
SenderImpl(SessionImpl& parent, const std::string& name,
const qpid::messaging::Address& address,
const qpid::messaging::Variant::Map& options);
- void send(qpid::messaging::Message&);
+ void send(const qpid::messaging::Message&);
void cancel();
void init(qpid::client::AsyncSession, AddressResolution&);
@@ -63,8 +65,16 @@
std::string destination;
std::string routingKey;
+ typedef boost::ptr_deque<OutgoingMessage> OutgoingMessages;
+ OutgoingMessages outgoing;
+ uint32_t capacity;
+ uint32_t window;
+
+ void checkPendingSends();
+ void replay();
+
//logic for application visible methods:
- void sendImpl(qpid::messaging::Message&);
+ void sendImpl(const qpid::messaging::Message&);
void cancelImpl();
//functors for application visible methods (allowing locking and
@@ -78,9 +88,9 @@
struct Send : Command
{
- qpid::messaging::Message* message;
+ const qpid::messaging::Message* message;
- Send(SenderImpl& i, qpid::messaging::Message* m) : Command(i), message(m) {}
+ Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
void operator()() { impl.sendImpl(*message); }
};
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp Mon Sep 7 18:09:00 2009
@@ -126,6 +126,15 @@
}
}
+void MessageImpl::getEncodedContent(Codec& codec, std::string& out) const
+{
+ if (content.getType() != VAR_VOID) {
+ codec.encode(content, out);
+ } else {
+ out = bytes;
+ }
+}
+
void MessageImpl::decode(Codec& codec)
{
codec.decode(bytes, content);
@@ -188,5 +197,9 @@
{
return *msg.impl;
}
+const MessageImpl& MessageImplAccess::get(const Message& msg)
+{
+ return *msg.impl;
+}
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h Mon Sep 7 18:09:00 2009
@@ -84,6 +84,7 @@
void clear();
+ void getEncodedContent(Codec& codec, std::string&) const;
void encode(Codec& codec);
void decode(Codec& codec);
@@ -125,6 +126,7 @@
struct MessageImplAccess
{
static MessageImpl& get(Message&);
+ static const MessageImpl& get(const Message&);
};
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp Mon Sep 7 18:09:00 2009
@@ -38,7 +38,7 @@
Sender::Sender(const Sender& s) : qpid::client::Handle<SenderImpl>() { PI::copy(*this, s); }
Sender::~Sender() { PI::dtor(*this); }
Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
-void Sender::send(Message& message) { impl->send(message); }
+void Sender::send(const Message& message) { impl->send(message); }
void Sender::cancel() { impl->cancel(); }
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h Mon Sep 7 18:09:00 2009
@@ -35,7 +35,7 @@
{
public:
virtual ~SenderImpl() {}
- virtual void send(Message& message) = 0;
+ virtual void send(const Message& message) = 0;
virtual void cancel() = 0;
private:
};
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=812243&r1=812242&r2=812243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Sep 7 18:09:00 2009
@@ -310,7 +310,6 @@
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
- BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
BOOST_CHECK_EQUAL(in.getContent().asMap()["abc"].asString(), "def");
BOOST_CHECK_EQUAL(in.getContent().asMap()["pi"].asFloat(), 3.14f);
fix.session.acknowledge();
@@ -329,7 +328,6 @@
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
- BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
Variant::List& list = in.getContent().asList();
BOOST_CHECK_EQUAL(list.size(), out.getContent().asList().size());
BOOST_CHECK_EQUAL(list.front().asString(), "abc");
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org