You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/05 21:16:18 UTC
svn commit: r525932 - in /incubator/qpid/trunk/qpid/cpp/src: ./ client/
framing/ tests/
Author: aconway
Date: Thu Apr 5 12:16:09 2007
New Revision: 525932
URL: http://svn.apache.org/viewvc?view=rev&rev=525932
Log:
* Exteneded use of shared pointers frame bodies across all send() commands.
* tests/Makefile.am: added check-unit target to run just unit tests.
* Introduced make_shared_ptr convenience function for wrapping
plain pointers with shared_ptr.
* cpp/src/client/ClientChannel.h,cpp (sendsendAndReceive,sendAndReceiveSync):
Pass shared_ptr instead of raw ptr to fix memory problems.
Updated the following files to use make_shared_ptr
- src/client/BasicMessageChannel.cpp
- src/client/ClientConnection.cpp
* src/client/MessageMessageChannel.cpp: implemented 0-9 message.get.
* src/framing/Correlator.h,cpp: Allow request sender to register actions
to take when the correlated response arrives.
* cpp/src/tests/FramingTest.cpp: Added Correlator tests.
* src/framing/ChannelAdapter.h,cpp: use Correlator to dispatch
response actions.
* cpp/src/shared_ptr.h (make_shared_ptr): Convenience function
to make a shared pointer from a raw pointer.
* cpp/src/tests/ClientChannelTest.cpp: Added message.get test.
* cpp/src/tests/Makefile.am (check-unit): Added test-unit target
to run unit tests.
Added:
incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp
incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h
incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h
incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h
incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Apr 5 12:16:09 2007
@@ -92,10 +92,11 @@
$(framing)/ProtocolVersionException.cpp \
$(framing)/Requester.cpp \
$(framing)/Responder.cpp \
+ $(framing)/Correlator.cpp \
$(framing)/Value.cpp \
$(framing)/Proxy.cpp \
$(gen)/AMQP_ClientProxy.cpp \
- $(gen)/AMQP_HighestVersion.h \
+ $(gen)/AMQP_HighestVersion.h \
$(gen)/AMQP_MethodVersionMap.cpp \
$(gen)/AMQP_ServerProxy.cpp \
Exception.cpp \
Modified: incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp Thu Apr 5 12:16:09 2007
@@ -81,10 +81,10 @@
BasicConsumeOkBody::shared_ptr ok =
channel.sendAndReceiveSync<BasicConsumeOkBody>(
synch,
- new BasicConsumeBody(
+ make_shared_ptr(new BasicConsumeBody(
channel.version, 0, queue.getName(), tag, noLocal,
ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
+ fields ? *fields : FieldTable())));
tag = ok->getConsumerTag();
}
@@ -102,7 +102,7 @@
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
channel.sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(channel.version, tag, !synch));
+ synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
}
void BasicMessageChannel::close(){
@@ -337,9 +337,9 @@
void BasicMessageChannel::setQos(){
channel.sendAndReceive<BasicQosOkBody>(
- new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+ make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
+ channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp Thu Apr 5 12:16:09 2007
@@ -60,7 +60,7 @@
init(id, con, con.getVersion()); // ChannelAdapter initialization.
string oob;
if (id != 0)
- sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
+ sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
}
void Channel::protocolInit(
@@ -77,10 +77,10 @@
string locale("en_US");
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
- new ConnectionStartOkBody(
+ make_shared_ptr(new ConnectionStartOkBody(
version, connectionStart->getRequestId(),
props, mechanism,
- response, locale));
+ response, locale)));
/**
* Assume for now that further challenges will not be required
@@ -136,15 +136,15 @@
FieldTable args;
sendAndReceiveSync<ExchangeDeclareOkBody>(
synch,
- new ExchangeDeclareBody(
- version, 0, name, type, false, false, false, false, !synch, args));
+ make_shared_ptr(new ExchangeDeclareBody(
+ version, 0, name, type, false, false, false, false, !synch, args)));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
sendAndReceiveSync<ExchangeDeleteOkBody>(
synch,
- new ExchangeDeleteBody(version, 0, name, false, !synch));
+ make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch)));
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -153,9 +153,9 @@
QueueDeclareOkBody::shared_ptr response =
sendAndReceiveSync<QueueDeclareOkBody>(
synch,
- new QueueDeclareBody(
+ make_shared_ptr(new QueueDeclareBody(
version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
if(synch) {
if(queue.getName().length() == 0)
queue.setName(response->getQueue());
@@ -167,7 +167,7 @@
string name = queue.getName();
sendAndReceiveSync<QueueDeleteOkBody>(
synch,
- new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+ make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)));
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
@@ -175,15 +175,15 @@
string q = queue.getName();
sendAndReceiveSync<QueueBindOkBody>(
synch,
- new QueueBindBody(version, 0, q, e, key,!synch, args));
+ make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args)));
}
void Channel::commit(){
- sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
+ sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version));
+ sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version)));
}
void Channel::handleMethodInContext(
@@ -203,7 +203,8 @@
}
try {
switch (method->amqpClassId()) {
- case BasicDeliverBody::CLASS_ID: messaging->handle(method); break;
+ case MessageOkBody::CLASS_ID:
+ case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
default: throw UnknownMethod();
@@ -261,8 +262,8 @@
try {
if (getId() != 0) {
sendAndReceive<ChannelCloseOkBody>(
- new ChannelCloseBody(
- version, code, text, classId, methodId));
+ make_shared_ptr(new ChannelCloseBody(
+ version, code, text, classId, methodId)));
}
static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
@@ -292,7 +293,7 @@
}
AMQMethodBody::shared_ptr Channel::sendAndReceive(
- AMQMethodBody* toSend, ClassId c, MethodId m)
+ AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
{
responses.expect();
send(toSend);
@@ -300,7 +301,7 @@
}
AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
- bool sync, AMQMethodBody* body, ClassId c, MethodId m)
+ bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
{
if(sync)
return sendAndReceive(body, c, m);
Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h Thu Apr 5 12:16:09 2007
@@ -56,6 +56,7 @@
{
private:
struct UnknownMethod {};
+ typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
sys::Mutex lock;
boost::scoped_ptr<MessageChannel> messaging;
@@ -82,21 +83,23 @@
const std::string& vhost);
framing::AMQMethodBody::shared_ptr sendAndReceive(
- framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
+ framing::AMQMethodBody::shared_ptr,
+ framing::ClassId, framing::MethodId);
framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
bool sync,
- framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
+ framing::AMQMethodBody::shared_ptr,
+ framing::ClassId, framing::MethodId);
template <class BodyType>
- boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) {
+ boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
template <class BodyType>
boost::shared_ptr<BodyType> sendAndReceiveSync(
- bool sync, framing::AMQMethodBody* body) {
+ bool sync, framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
sendAndReceiveSync(
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp Thu Apr 5 12:16:09 2007
@@ -87,8 +87,8 @@
// partly closed with threads left unjoined.
isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
- new ConnectionCloseBody(
- getVersion(), code, msg, classId, methodId));
+ make_shared_ptr(new ConnectionCloseBody(
+ getVersion(), code, msg, classId, methodId)));
using boost::bind;
for_each(channels.begin(), channels.end(),
Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h Thu Apr 5 12:16:09 2007
@@ -33,6 +33,8 @@
*
* \ingroup clientapi
*/
+// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
+// basic header properties.
class Message : public framing::BasicHeaderProperties {
public:
Message(const std::string& data_=std::string()) : data(data_) {}
Modified: incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp Thu Apr 5 12:16:09 2007
@@ -25,6 +25,7 @@
#include "../framing/FieldTable.h"
#include "Connection.h"
#include "../shared_ptr.h"
+#include <boost/bind.hpp>
namespace qpid {
namespace client {
@@ -48,9 +49,9 @@
if (tag.empty())
tag = newTag();
channel.sendAndReceive<MessageOkBody>(
- new MessageConsumeBody(
+ make_shared_ptr(new MessageConsumeBody(
channel.getVersion(), 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, fields ? *fields : FieldTable()));
+ ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
// // FIXME aconway 2007-02-20: Race condition!
// // We could receive the first message for the consumer
@@ -115,16 +116,44 @@
*/
const string getDestinationId("__get__");
+/**
+ * A destination that provides a Correlator::Action to handle
+ * MessageEmpty responses.
+ */
+struct MessageGetDestination : public IncomingMessage::WaitableDestination
+{
+ void response(shared_ptr<AMQResponseBody> response) {
+ if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
+ switch (response->amqpMethodId()) {
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do, wait for transfer.
+ return;
+ case MessageEmptyBody::METHOD_ID:
+ empty(); // Wake up waiter with empty queue.
+ return;
+ }
+ }
+ throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
+ }
+
+ Correlator::Action action() {
+ return boost::bind(&MessageGetDestination::response, this, _1);
+ }
+};
+
bool MessageMessageChannel::get(
- Message& , const Queue& , AckMode )
+ Message& msg, const Queue& queue, AckMode ackMode)
{
Mutex::ScopedLock l(lock);
-// incoming.addDestination(getDestinationId, getDest);
-// channel.send(
-// new MessageGetBody(
-// channel.version, 0, queue.getName(), getDestinationId, ackMode));
-// return getDest.wait(msg);
- return false;
+ std::string destName=newTag();
+ MessageGetDestination dest;
+ incoming.addDestination(destName, dest);
+ channel.send(
+ make_shared_ptr(
+ new MessageGetBody(
+ channel.version, 0, queue.getName(), destName, ackMode)),
+ dest.action());
+ return dest.wait(msg);
}
@@ -176,9 +205,30 @@
// FIXME aconway 2007-02-23:
throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
}
- channel.sendAndReceive<MessageOkBody>(transfer.get());
+ channel.sendAndReceive<MessageOkBody>(transfer);
}
+void copy(Message& msg, MessageTransferBody& transfer) {
+ // FIXME aconway 2007-04-05: Verify all required fields
+ // are copied.
+ msg.setContentType(transfer.getContentType());
+ msg.setContentEncoding(transfer.getContentEncoding());
+ msg.setHeaders(transfer.getApplicationHeaders());
+ msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
+ msg.setPriority(transfer.getPriority());
+ msg.setCorrelationId(transfer.getCorrelationId());
+ msg.setReplyTo(transfer.getReplyTo());
+ // FIXME aconway 2007-04-05: TTL/Expiration
+ msg.setMessageId(transfer.getMessageId());
+ msg.setTimestamp(transfer.getTimestamp());
+ msg.setUserId(transfer.getUserId());
+ msg.setAppId(transfer.getAppId());
+ msg.setDestination(transfer.getDestination());
+ msg.setRedelivered(transfer.getRedelivered());
+ msg.setDeliveryTag(0); // No meaning in 0-9
+ if (transfer.getBody().isInline())
+ msg.setData(transfer.getBody().getValue());
+}
void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
@@ -203,23 +253,38 @@
break;
}
- case MessageEmptyBody::METHOD_ID: {
- // FIXME aconway 2007-04-04:
- // getDest.empty();
+ case MessageTransferBody::METHOD_ID: {
+ MessageTransferBody::shared_ptr transfer=
+ shared_polymorphic_downcast<MessageTransferBody>(method);
+ if (transfer->getBody().isInline()) {
+ Message msg;
+ copy(msg, *transfer);
+ // Deliver it.
+ incoming.getDestination(transfer->getDestination()).message(msg);
+ }
+ else {
+ Message& msg=incoming.createMessage(
+ transfer->getDestination(), transfer->getBody().getValue());
+ copy(msg, *transfer);
+ // Will be delivered when reference closes.
+ }
break;
}
- case MessageCancelBody::METHOD_ID:
- case MessageCheckpointBody::METHOD_ID:
+ case MessageEmptyBody::METHOD_ID:
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do
+ break;
// FIXME aconway 2007-04-03: TODO
- case MessageOkBody::METHOD_ID:
+ case MessageCancelBody::METHOD_ID:
+ case MessageCheckpointBody::METHOD_ID:
case MessageOffsetBody::METHOD_ID:
case MessageQosBody::METHOD_ID:
case MessageRecoverBody::METHOD_ID:
case MessageRejectBody::METHOD_ID:
case MessageResumeBody::METHOD_ID:
- case MessageTransferBody::METHOD_ID:
+ break;
default:
throw Channel::UnknownMethod();
}
@@ -322,10 +387,10 @@
void MessageMessageChannel::setQos(){
channel.sendAndReceive<MessageOkBody>(
- new MessageQosBody(channel.version, 0, channel.getPrefetch(), false));
+ make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
channel.sendAndReceive<TxSelectOkBody>(
- new TxSelectBody(channel.version));
+ make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp Thu Apr 5 12:16:09 2007
@@ -35,15 +35,19 @@
version = v;
}
-RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
- RequestId result = 0;
+RequestId ChannelAdapter::send(
+ shared_ptr<AMQBody> body, Correlator::Action action)
+{
+ RequestId requestId = 0;
assertChannelOpen();
switch (body->type()) {
case REQUEST_BODY: {
AMQRequestBody::shared_ptr request =
boost::shared_polymorphic_downcast<AMQRequestBody>(body);
requester.sending(request->getData());
- result = request->getData().requestId;
+ requestId = request->getData().requestId;
+ if (!action.empty())
+ correlator.request(requestId, action);
break;
}
case RESPONSE_BODY: {
@@ -52,9 +56,10 @@
responder.sending(response->getData());
break;
}
+ // No action required for other body types.
}
out->send(new AMQFrame(getVersion(), getId(), body));
- return result;
+ return requestId;
}
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
@@ -66,10 +71,15 @@
void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
assertMethodOk(*response);
- // TODO aconway 2007-01-30: Consider a response handled on receipt.
- // Review - any cases where this is not the case?
AMQResponseBody::Data& responseData = response->getData();
+
+ // FIXME aconway 2007-04-05: processed should be last
+ // but causes problems with InProcessBroker tests because
+ // we execute client code in handleMethod.
+ // Need to introduce a queue & 2 threads for inprocess.
requester.processed(responseData);
+ // FIXME aconway 2007-04-04: exception handling.
+ correlator.response(response);
handleMethod(response);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h Thu Apr 5 12:16:09 2007
@@ -22,11 +22,11 @@
*
*/
-#include <boost/shared_ptr.hpp>
-
+#include "../shared_ptr.h"
#include "BodyHandler.h"
#include "Requester.h"
#include "Responder.h"
+#include "Correlator.h"
#include "amqp_types.h"
namespace qpid {
@@ -64,17 +64,24 @@
ChannelId getId() const { return id; }
ProtocolVersion getVersion() const { return version; }
-
+
/**
- * Wrap body in a frame and send the frame.
- * Takes ownership of body.
+ * Send a frame.
+ *@param body Body of the frame.
+ *@param action optional action to execute when we receive a
+ *response to this frame. Ignored if body is not a Request.
+ *@return If body is a request, the ID assigned else 0.
*/
- RequestId send(AMQBody::shared_ptr body);
+ RequestId send(shared_ptr<AMQBody> body,
+ Correlator::Action action=Correlator::Action());
+
+ // TODO aconway 2007-04-05: remove and use make_shared_ptr at call sites.
+ /**@deprecated Use make_shared_ptr with the other send() override */
RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
- void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
- void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>);
+ void handleMethod(shared_ptr<AMQMethodBody>);
+ void handleRequest(shared_ptr<AMQRequestBody>);
+ void handleResponse(shared_ptr<AMQResponseBody>);
virtual bool isOpen() const = 0;
@@ -84,7 +91,7 @@
void assertChannelNotOpen() const;
virtual void handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ shared_ptr<AMQMethodBody> method,
const MethodContext& context) = 0;
RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
@@ -97,6 +104,7 @@
ProtocolVersion version;
Requester requester;
Responder responder;
+ Correlator correlator;
};
}}
Added: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp?view=auto&rev=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp Thu Apr 5 12:16:09 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Correlator.h"
+
+namespace qpid {
+namespace framing {
+
+void Correlator::request(RequestId id, Action action) {
+ actions[id] = action;
+}
+
+bool Correlator::response(shared_ptr<AMQResponseBody> r) {
+ Actions::iterator begin = actions.lower_bound(r->getRequestId());
+ Actions::iterator end =
+ actions.upper_bound(r->getRequestId()+r->getBatchOffset());
+ bool didAction = false;
+ for(Actions::iterator i=begin; i != end; ++i) {
+ // FIXME aconway 2007-04-04: Exception handling.
+ didAction = true;
+ i->second(r);
+ actions.erase(i);
+ }
+ return didAction;
+}
+
+}} // namespace qpid::framing
Propchange: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h?view=auto&rev=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h Thu Apr 5 12:16:09 2007
@@ -0,0 +1,68 @@
+#ifndef _framing_Correlator_h
+#define _framing_Correlator_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "../shared_ptr.h"
+#include "../framing/AMQResponseBody.h"
+#include <boost/function.hpp>
+#include <map>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Correlate responses with actions established when sending the request.
+ *
+ * THREAD UNSAFE.
+ */
+class Correlator
+{
+ public:
+ typedef shared_ptr<AMQResponseBody> ResponsePtr;
+ typedef boost::function<void (ResponsePtr)> Action;
+
+ /**
+ * Note that request with id was sent, record an action to call
+ * when a response arrives.
+ */
+ void request(RequestId id, Action doOnResponse);
+
+ /**
+ * Note response received, call action for associated request if any.
+ * Return true of some action(s) were executed.
+ */
+ bool response(shared_ptr<AMQResponseBody>);
+
+ /**
+ * Note the given execution mark was received, call actions
+ * for any requests that are impicitly responded to.
+ */
+ void mark(RequestId mark);
+
+ private:
+ typedef std::map<RequestId, Action> Actions;
+ Actions actions;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Correlator_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h Thu Apr 5 12:16:09 2007
@@ -32,8 +32,7 @@
/**
* Manage request IDs and the response mark for locally initiated requests.
*
- * THREAD UNSAFE: This class is called as frames are sent or received
- * sequentially on a connection, so it does not need to be thread safe.
+ * THREAD UNSAFE: must be locked externally.
*/
class Requester
{
@@ -46,12 +45,14 @@
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
- /** Get the next request id to be used. */
- RequestId getNextId() { return lastId + 1; }
- /** Get the first request acked by this response */
- RequestId getFirstAckRequest() { return firstAckRequest; }
- /** Get the last request acked by this response */
- RequestId getLastAckRequest() { return lastAckRequest; }
+ /** Get the next request id to be used. */
+ RequestId getNextId() { return lastId + 1; }
+
+ /** Get the first request acked by last response */
+ RequestId getFirstAckRequest() { return firstAckRequest; }
+
+ /** Get the last request acked by last response */
+ RequestId getLastAckRequest() { return lastAckRequest; }
private:
RequestId lastId;
Modified: incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h Thu Apr 5 12:16:09 2007
@@ -23,12 +23,20 @@
#include <boost/cast.hpp>
namespace qpid {
-/// Import shared_ptr definitions into qpid namespace.
+
+// Import shared_ptr definitions into qpid namespace and define some
+// useful shared_ptr templates for convenience.
+
using boost::shared_ptr;
using boost::dynamic_pointer_cast;
using boost::static_pointer_cast;
using boost::const_pointer_cast;
using boost::shared_polymorphic_downcast;
+
+template <class T> shared_ptr<T> make_shared_ptr(T* ptr) {
+ return shared_ptr<T>(ptr);
+}
+
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Thu Apr 5 12:16:09 2007
@@ -27,6 +27,7 @@
#include "../client/ClientExchange.h"
#include "../client/MessageListener.h"
#include "../client/BasicMessageChannel.h"
+#include "../client/MessageMessageChannel.h"
using namespace std;
using namespace boost;
@@ -203,7 +204,17 @@
}
};
+class MessageClientChannelTest : public ClientChannelTestBase {
+ CPPUNIT_TEST_SUITE(MessageClientChannelTest);
+ CPPUNIT_TEST(testPublishGet);
+ CPPUNIT_TEST_SUITE_END();
+ public:
+ MessageClientChannelTest() {
+ channel.reset(new Channel(false, 500, Channel::AMQP_09));
+ }
+};
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(BasicClientChannelTest);
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageClientChannelTest);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp Thu Apr 5 12:16:09 2007
@@ -18,9 +18,6 @@
* under the License.
*
*/
-#include <memory>
-#include <boost/lexical_cast.hpp>
-
#include "ConnectionRedirectBody.h"
#include "../framing/ProtocolVersion.h"
#include "../framing/amqp_framing.h"
@@ -38,6 +35,11 @@
#include "../client/Connection.h"
#include "../client/ClientExchange.h"
#include "../client/ClientQueue.h"
+#include "../framing/Correlator.h"
+#include "BasicGetOkBody.h"
+#include <memory>
+#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
using namespace qpid;
using namespace qpid::framing;
@@ -65,6 +67,7 @@
CPPUNIT_TEST(testResponseBodyFrame);
CPPUNIT_TEST(testRequester);
CPPUNIT_TEST(testResponder);
+ CPPUNIT_TEST(testCorrelator);
CPPUNIT_TEST(testInlineContent);
CPPUNIT_TEST(testContentReference);
CPPUNIT_TEST(testContentValidation);
@@ -300,7 +303,7 @@
Responder r;
AMQRequestBody::Data q;
AMQResponseBody::Data p;
-
+
q.requestId = 1;
q.responseMark = 0;
r.received(q);
@@ -333,6 +336,48 @@
// TODO aconway 2007-01-14: Test for batching when supported.
+ }
+
+
+ std::vector<Correlator::ResponsePtr> correlations;
+
+ void correlatorCallback(Correlator::ResponsePtr r) {
+ correlations.push_back(r);
+ }
+
+ struct DummyResponse : public AMQResponseBody {
+ DummyResponse(ResponseId id=0, RequestId req=0, BatchOffset off=0)
+ : AMQResponseBody(version, id, req, off) {}
+ uint32_t size() const { return 0; }
+ void print(std::ostream&) const {}
+ MethodId amqpMethodId() const { return 0; }
+ ClassId amqpClassId() const { return 0; }
+ void encodeContent(Buffer& ) const {}
+ void decodeContent(Buffer& ) {}
+ };
+
+ void testCorrelator() {
+ CPPUNIT_ASSERT(correlations.empty());
+ Correlator c;
+ Correlator::Action action = boost::bind(&FramingTest::correlatorCallback, this, _1);
+ c.request(5, action);
+ Correlator::ResponsePtr r1(new DummyResponse(3, 5, 0));
+ CPPUNIT_ASSERT(c.response(r1));
+ CPPUNIT_ASSERT_EQUAL(size_t(1), correlations.size());
+ CPPUNIT_ASSERT(correlations.front() == r1);
+ correlations.clear();
+
+ c.request(6, action);
+ c.request(7, action);
+ c.request(8, action);
+ Correlator::ResponsePtr r2(new DummyResponse(4, 6, 3));
+ CPPUNIT_ASSERT(c.response(r2));
+ CPPUNIT_ASSERT_EQUAL(size_t(3), correlations.size());
+ CPPUNIT_ASSERT(r2 == correlations[0]);
+ CPPUNIT_ASSERT(r2 == correlations[1]);
+ CPPUNIT_ASSERT(r2 == correlations[2]);
+ Correlator::ResponsePtr r3(new DummyResponse(5, 99, 0));
+ CPPUNIT_ASSERT(!c.response(r3));
}
// expect may contain null chars so use string(ptr,size) constructor
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Apr 5 12:16:09 2007
@@ -110,6 +110,9 @@
check: .valgrindrc $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker)
+check-unit:
+ $(MAKE) check TESTS=run-unit-tests
+
# Create a copy so user can modify without risk of checking in their mods.
.valgrindrc: .valgrindrc-default
cp .valgrindrc-default .valgrindrc