You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/02 23:03:12 UTC
svn commit: r502767 [2/2] - in /incubator/qpid/branches/qpid.0-9:
cpp/lib/broker/ cpp/lib/client/ cpp/lib/common/framing/ cpp/tests/
gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp Fri Feb 2 14:03:10 2007
@@ -20,6 +20,7 @@
*/
#include <LazyLoadedContent.h>
#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -37,19 +38,21 @@
return 0;//all content is written as soon as it is added
}
-void LazyLoadedContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+void LazyLoadedContent::send(ChannelAdapter& channel, u_int32_t framesize)
{
if (expectedSize > framesize) {
- for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {
+ for (u_int64_t offset = 0; offset < expectedSize; offset += framesize)
+ {
u_int64_t remaining = expectedSize - offset;
string data;
- store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining);
- out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ store->loadContent(msg, data, offset,
+ remaining > framesize ? framesize : remaining);
+ channel.send(new AMQContentBody(data));
}
} else {
string data;
store->loadContent(msg, data, 0, expectedSize);
- out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ channel.send(new AMQContentBody(data));
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h Fri Feb 2 14:03:10 2007
@@ -31,10 +31,14 @@
Message* const msg;
const u_int64_t expectedSize;
public:
- LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize);
+ LazyLoadedContent(
+ MessageStore* const store, Message* const msg,
+ u_int64_t expectedSize);
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
- void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void send(
+ framing::ChannelAdapter&,
+ u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
void destroy();
~LazyLoadedContent(){}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h Fri Feb 2 14:03:10 2007
@@ -39,7 +39,9 @@
virtual void complete(Message::shared_ptr&) = 0;
virtual ~CompletionHandler(){}
};
- MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0);
+ MessageBuilder(CompletionHandler* _handler,
+ MessageStore* const store = 0,
+ u_int64_t stagingThreshold = 0);
void initialise(Message::shared_ptr& msg);
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Fri Feb 2 14:03:10 2007
@@ -34,8 +34,8 @@
//
void
MessageHandlerImpl::append(const MethodContext&,
- const string& /*reference*/,
- const string& /*bytes*/ )
+ const string& /*reference*/,
+ const string& /*bytes*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
@@ -43,7 +43,7 @@
void
MessageHandlerImpl::cancel( const MethodContext& context,
- const string& destination )
+ const string& destination )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -54,28 +54,28 @@
void
MessageHandlerImpl::checkpoint(const MethodContext&,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::close(const MethodContext&,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::consume(const MethodContext& context,
- u_int16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const qpid::framing::FieldTable& filter )
+ u_int16_t /*ticket*/,
+ const string& queueName,
+ const string& destination,
+ bool noLocal,
+ bool noAck,
+ bool exclusive,
+ const qpid::framing::FieldTable& filter )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -106,15 +106,15 @@
void
MessageHandlerImpl::get( const MethodContext& context,
- u_int16_t /*ticket*/,
- const string& queueName,
- const string& /*destination*/,
- bool noAck )
+ u_int16_t /*ticket*/,
+ const string& queueName,
+ const string& /*destination*/,
+ bool noAck )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Queue::shared_ptr queue =
- connection.getQueue(queueName, context.channelId);
+ connection.getQueue(queueName, context.channel->getId());
// FIXME: get is probably Basic specific
if(!channel.get(queue, !noAck)){
@@ -125,7 +125,7 @@
void
MessageHandlerImpl::offset(const MethodContext&,
- u_int64_t /*value*/ )
+ u_int64_t /*value*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
@@ -138,16 +138,16 @@
void
MessageHandlerImpl::open(const MethodContext&,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::qos(const MethodContext& context,
- u_int32_t prefetchSize,
- u_int16_t prefetchCount,
- bool /*global*/ )
+ u_int32_t prefetchSize,
+ u_int16_t prefetchCount,
+ bool /*global*/ )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -160,7 +160,7 @@
void
MessageHandlerImpl::recover(const MethodContext&,
- bool requeue )
+ bool requeue )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -170,45 +170,45 @@
void
MessageHandlerImpl::reject(const MethodContext&,
- u_int16_t /*code*/,
- const string& /*text*/ )
+ u_int16_t /*code*/,
+ const string& /*text*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::resume(const MethodContext&,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
MessageHandlerImpl::transfer(const MethodContext& context,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool immediate,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& exchangeName,
- const string& routingKey,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content body,
- bool mandatory )
+ u_int16_t /*ticket*/,
+ const string& /*destination*/,
+ bool /*redelivered*/,
+ bool immediate,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& exchangeName,
+ const string& routingKey,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content body,
+ bool mandatory )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -216,14 +216,15 @@
broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
if (body.isInline()) {
- MessageMessage* msg =
- new MessageMessage(*(context.methodBody), exchangeName, routingKey, mandatory, immediate);
- channel.handlePublish(msg, exchange);
+ MessageMessage* msg =
+ new MessageMessage(context.methodBody, exchangeName,
+ routingKey, mandatory, immediate);
+ channel.handlePublish(msg, exchange);
- connection.client->getMessageHandler()->ok(context);
+ connection.client->getMessageHandler()->ok(context);
} else {
- // Don't handle reference content yet
- assert(body.isInline());
+ // Don't handle reference content yet
+ assert(body.isInline());
}
}else{
throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/NullMessageStore.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/NullMessageStore.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/NullMessageStore.h Fri Feb 2 14:03:10 2007
@@ -34,7 +34,7 @@
class NullMessageStore : public MessageStore{
const bool warn;
public:
- NullMessageStore(bool warn = true);
+ NullMessageStore(bool warn = false);
virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
virtual void destroy(const Queue& queue);
virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Fri Feb 2 14:03:10 2007
@@ -75,7 +75,7 @@
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
new ConnectionStartOkBody(
- version, props, mechanism, response, locale));
+ version, responses.getRequestId(), props, mechanism, response, locale));
/**
* Assume for now that further challenges will not be required
@@ -85,9 +85,9 @@
connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
**/
- (new ConnectionTuneOkBody(
- version, proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat()))->send(context);
+ send(new ConnectionTuneOkBody(
+ version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(),
+ proposal->getHeartbeat()));
u_int16_t heartbeat = proposal->getHeartbeat();
connection->connector->setReadTimeout(heartbeat * 2);
@@ -96,8 +96,7 @@
// Send connection open.
std::string capabilities;
responses.expect();
- (new ConnectionOpenBody(version, vhost, capabilities, true))
- ->send(context);
+ send(new ConnectionOpenBody(version, vhost, capabilities, true));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
responses.waitForResponse();
@@ -208,8 +207,7 @@
if (i != consumers.end()) {
Consumer& c = i->second;
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- (new BasicAckBody(version, c.lastDeliveryTag, true))
- ->send(context);
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
sendAndReceiveSync<BasicCancelOkBody>(
synch, new BasicCancelBody(version, tag, !synch));
consumers.erase(tag);
@@ -227,8 +225,7 @@
// trying the rest. NB no memory leaks if we do,
// ConsumerMap holds values, not pointers.
//
- (new BasicAckBody(version, c.lastDeliveryTag, true))
- ->send(context);
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
}
}
}
@@ -249,7 +246,7 @@
bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
string name = queue.getName();
responses.expect();
- (new BasicGetBody(version, 0, name, ackMode))->send(context);
+ send(new BasicGetBody(version, 0, name, ackMode));
responses.waitForResponse();
AMQMethodBody::shared_ptr response = responses.getResponse();
if(response->isA<BasicGetOkBody>()) {
@@ -277,7 +274,7 @@
string e = exchange.getName();
string key = routingKey;
- (new BasicPublishBody(version, 0, e, key, mandatory, immediate))->send(context);
+ send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
@@ -426,8 +423,7 @@
if(++(consumer.count) < prefetch) break;
//else drop-through
case AUTO_ACK:
- (new BasicAckBody(version, msg.getDeliveryTag(), multiple))
- ->send(context);
+ send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
consumer.lastDeliveryTag = 0;
}
}
@@ -512,7 +508,7 @@
void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
{
responses.expect();
- toSend->send(context);
+ send(toSend);
responses.receive(c, m);
}
@@ -522,7 +518,7 @@
if(sync)
sendAndReceive(body, c, m);
else
- body->send(context);
+ send(body);
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp Fri Feb 2 14:03:10 2007
@@ -63,7 +63,7 @@
THROW_QPID_ERROR(
PROTOCOL_ERROR, "Channel closed unexpectedly.");
}
- if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+ if(!validate(response->amqpClassId(), response->amqpMethodId())) {
THROW_QPID_ERROR(
PROTOCOL_ERROR,
boost::format("Expected class:method %d:%d, got %d:%d")
@@ -71,6 +71,10 @@
}
}
+RequestId ResponseHandler::getRequestId() {
+ assert(response->getRequestId());
+ return response->getRequestId();
+}
void ResponseHandler::expect(){
waiting = true;
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h Fri Feb 2 14:03:10 2007
@@ -20,8 +20,7 @@
*/
#include <string>
-#include <amqp_types.h>
-#include <framing/amqp_framing.h>
+#include <framing/amqp_framing.h> // FIXME aconway 2007-02-01: #include cleanup.
#include <sys/Monitor.h>
#ifndef _ResponseHandler_
@@ -52,11 +51,13 @@
bool validate(framing::ClassId, framing::MethodId);
void receive(framing::ClassId, framing::MethodId);
+ framing::RequestId getRequestId();
+
template <class BodyType> bool validate() {
return validate(BodyType::CLASS_ID, BodyType::METHOD_ID);
}
template <class BodyType> void receive() {
- return receive(BodyType::CLASS_ID, BodyType::METHOD_ID);
+ receive(BodyType::CLASS_ID, BodyType::METHOD_ID);
}
};
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp Fri Feb 2 14:03:10 2007
@@ -56,8 +56,4 @@
decodeContent(buffer);
}
-void AMQMethodBody::send(const MethodContext& context) {
- context.out->send(new AMQFrame(version, context.channelId, this));
-}
-
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h Fri Feb 2 14:03:10 2007
@@ -57,11 +57,8 @@
return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID;
}
- /**
- * Wrap this method in a frame and send using the current context.
- * Note the frame takes ownership of the body, it will be deleted.
- */
- virtual void send(const MethodContext& context);
+ /** Return request ID or response correlationID */
+ virtual RequestId getRequestId() const { return 0; }
protected:
static u_int32_t baseSize() { return 4; }
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h Fri Feb 2 14:03:10 2007
@@ -58,8 +58,8 @@
Data& getData() { return data; }
RequestId getRequestId() const { return data.requestId; }
- void setRequestId(RequestId id) { data.requestId=id; }
ResponseId getResponseMark() const { return data.responseMark; }
+ void setRequestId(RequestId id) { data.requestId=id; }
void setResponseMark(ResponseId mark) { data.responseMark=mark; }
protected:
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp Fri Feb 2 14:03:10 2007
@@ -62,11 +62,4 @@
<< ",batch=" << data.batchOffset << "): ";
}
-void AMQResponseBody::send(const MethodContext& context) {
- setRequestId(context.requestId);
- assert(context.out);
- context.out->send(
- new AMQFrame(version, context.channelId, this));
-}
-
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h Fri Feb 2 14:03:10 2007
@@ -62,14 +62,12 @@
void encode(Buffer& buffer) const;
Data& getData() { return data; }
- ResponseId getResponseId() { return data.responseId; }
- RequestId getRequestId() { return data.requestId; }
- BatchOffset getBatchOffset() { return data.batchOffset; }
+ ResponseId getResponseId() const { return data.responseId; }
+ RequestId getRequestId() const { return data.requestId; }
+ BatchOffset getBatchOffset() const { return data.batchOffset; }
void setResponseId(ResponseId id) { data.responseId = id; }
void setRequestId(RequestId id) { data.requestId = id; }
void setBatchOffset(BatchOffset id) { data.batchOffset = id; }
-
- virtual void send(const MethodContext& context);
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp Fri Feb 2 14:03:10 2007
@@ -32,12 +32,10 @@
id = i;
out = &o;
version = v;
- context = MethodContext(id, this);
}
-void ChannelAdapter::send(AMQFrame* frame) {
+void ChannelAdapter::send(AMQBody::shared_ptr body) {
assertChannelOpen();
- AMQBody::shared_ptr body = frame->getBody();
switch (body->type()) {
case REQUEST_BODY: {
AMQRequestBody::shared_ptr request =
@@ -52,18 +50,13 @@
break;
}
}
- out->send(frame);
-}
-
-void ChannelAdapter::send(AMQBody::shared_ptr body) {
- send(new AMQFrame(getVersion(), getId(), body));
+ out->send(new AMQFrame(getVersion(), getId(), body));
}
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
assertMethodOk(*request);
responder.received(request->getData());
- context =MethodContext(id, request, this, request->getRequestId());
- handleMethodInContext(request, context);
+ handleMethodInContext(request, MethodContext(this, request));
}
void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
@@ -76,8 +69,7 @@
void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
assertMethodOk(*method);
- context = MethodContext(id, method, this);
- handleMethodInContext(method, context);
+ handleMethodInContext(method, MethodContext(this, method));
}
void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const {
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h Fri Feb 2 14:03:10 2007
@@ -27,7 +27,7 @@
#include "BodyHandler.h"
#include "Requester.h"
#include "Responder.h"
-#include "OutputHandler.h"
+#include "framing/amqp_types.h"
namespace qpid {
namespace framing {
@@ -37,24 +37,26 @@
/**
* Base class for client and broker channel adapters.
*
- * As BodyHandler:
+ * BodyHandler::handl*
* - receives frame bodies from the network.
* - Updates request/response data.
* - Dispatches requests with a MethodContext for responses.
*
- * As OutputHandler:
+ * send()
* - Updates request/resposne ID data.
* - Forwards frame to the peer.
*
* Thread safety: OBJECT UNSAFE. Instances must not be called
* concurrently. AMQP defines channels to be serialized.
*/
-class ChannelAdapter : public BodyHandler, public OutputHandler {
+class ChannelAdapter : public BodyHandler {
public:
/**
*@param output Processed frames are forwarded to this handler.
*/
- ChannelAdapter() : context(0, 0), id(0), out(0) {}
+ ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0,
+ const ProtocolVersion& ver=ProtocolVersion())
+ : id(id_), out(out_), version(ver) {}
/** Initialize the channel adapter. */
void init(ChannelId, OutputHandler&, const ProtocolVersion&);
@@ -63,12 +65,6 @@
const ProtocolVersion& getVersion() const { return version; }
/**
- * Do request/response-id processing and then forward to
- * handler provided to constructor. Response frames should
- * have their request-id set before calling send.
- */
- void send(AMQFrame* frame);
- /**
* Wrap body in a frame and send the frame.
* Takes ownership of body.
*/
@@ -92,9 +88,6 @@
RequestId getRequestInProgress() { return requestInProgress; }
- protected:
- MethodContext context;
-
private:
ChannelId id;
OutputHandler* out;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h Fri Feb 2 14:03:10 2007
@@ -19,6 +19,8 @@
*
*/
+#include <boost/shared_ptr.hpp>
+
#include "OutputHandler.h"
#include "ProtocolVersion.h"
@@ -29,53 +31,40 @@
class BodyHandler;
class AMQMethodBody;
+class ChannelAdapter;
/**
* Invocation context for an AMQP method.
- * Some of the context information is related to the channel, some
- * to the specific invocation - e.g. requestId.
- *
- * All generated proxy and handler functions take a MethodContext parameter.
- *
- * The user does not need to create MethodContext objects explicitly,
- * the constructor will implicitly create one from a channel ID.
*
- * Other context members are for internal use.
+ * It provides the method being processed and the channel on which
+ * it arrived.
+ *
+ * All Handler functions take a MethodContext as the last parameter.
*/
struct MethodContext
{
+ typedef boost::shared_ptr<AMQMethodBody> BodyPtr;
+
+ MethodContext(ChannelAdapter* ch=0, BodyPtr method=BodyPtr())
+ : channel(ch), methodBody(method) {}
+
/**
- * Passing a integer channel-id in place of a MethodContext
- * will automatically construct the MethodContext.
+ * Channel on which the method being processed arrived.
+ * 0 if the method was constructed by the caller
+ * rather than received from a channel.
*/
- MethodContext(ChannelId channel,
- OutputHandler* output=0, RequestId request=0)
- : channelId(channel), out(output), requestId(request)
- {}
-
- MethodContext(ChannelId channel,
- boost::shared_ptr<AMQMethodBody> method,
- OutputHandler* output=0, RequestId request=0)
- : channelId(channel), out(output), requestId(request),
- methodBody(method)
- {}
-
- /** \internal Channel on which the method is sent. */
- ChannelId channelId;
-
- /** Output handler for responses in this context */
- OutputHandler* out;
-
- /** \internal If we are in the context of processing an incoming request,
- * this is the ID. Otherwise it is 0.
- */
- RequestId requestId;
+ ChannelAdapter* channel;
- /** \internal This is the Method Body itself
- * It's useful for passing around instead of unpacking all its parameters
+ /**
+ * Body of the method being processed.
+ * It's useful for passing around instead of unpacking all its parameters.
+ * It's also provides the request ID when constructing a response.
*/
- boost::shared_ptr<AMQMethodBody> methodBody;
+ BodyPtr methodBody;
};
+
+// FIXME aconway 2007-02-01: Method context only required on Handler
+// functions, not on Proxy functions.
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersion.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersion.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersion.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersion.cpp Fri Feb 2 14:03:10 2007
@@ -20,10 +20,13 @@
*/
#include <ProtocolVersion.h>
#include <sstream>
+#include "AMQP_HighestVersion.h"
using namespace qpid::framing;
-ProtocolVersion::ProtocolVersion() {}
+ProtocolVersion::ProtocolVersion() {
+ *this = highestProtocolVersion;
+}
ProtocolVersion::ProtocolVersion(u_int8_t _major, u_int8_t _minor) :
major_(_major),
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp Fri Feb 2 14:03:10 2007
@@ -28,6 +28,9 @@
#include <memory>
#include <AMQP_HighestVersion.h>
#include "AMQFrame.h"
+#include "DummyChannel.h"
+#include "broker/Connection.h"
+#include "ProtocolInitiation.h"
using namespace boost;
using namespace qpid::broker;
@@ -36,12 +39,12 @@
using std::string;
using std::queue;
-struct DummyHandler : OutputHandler{
+struct DummyHandler : ConnectionOutputHandler{
std::vector<AMQFrame*> frames;
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
+ void send(AMQFrame* frame){ frames.push_back(frame); }
+
+ void close() {};
};
@@ -55,6 +58,10 @@
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST_SUITE_END();
+ Broker::shared_ptr broker;
+ Connection connection;
+ DummyHandler handler;
+
class MockMessageStore : public NullMessageStore
{
struct MethodCall
@@ -135,9 +142,17 @@
public:
+ ChannelTest() :
+ broker(Broker::create()),
+ connection(&handler, *broker)
+ {
+ connection.initiated(new ProtocolInitiation());
+ }
+
+
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0);
+ Channel channel(connection, 0, 0, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
@@ -162,12 +177,10 @@
}
void testDeliveryNoAck(){
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000);
-
+ Channel channel(connection, 7, 10000);
const string data("abcdefghijklmn");
-
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ Message::shared_ptr msg(
+ createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
ConnectionToken* owner(0);
@@ -175,22 +188,25 @@
channel.consume(tag, queue, false, false, owner);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel());
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+ handler.frames[0]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
+ handler.frames[1]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
+ handler.frames[2]->getBody().get()));
+ AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
+ handler.frames[3]->getBody().get());
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
}
void testDeliveryAndRecovery(){
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000);
+ Channel channel(connection, 7, 10000);
const string data("abcdefghijklmn");
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
@@ -202,26 +218,32 @@
channel.consume(tag, queue, true, false, owner);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel());
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+ handler.frames[0]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
+ handler.frames[1]->getBody().get()));
+ CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
+ handler.frames[2]->getBody().get()));
+ AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
+ handler.frames[3]->getBody().get());
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
}
void testStaging(){
MockMessageStore store;
- DummyHandler handler;
- Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
+ Channel channel(
+ connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
const string data[] = {"abcde", "fghij", "klmno"};
- Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
+ Message* msg = new BasicMessage(
+ 0, "my_exchange", "my_routing_key", false, false,
+ DummyChannel::basicGetBody());
store.expect();
store.stage(msg);
@@ -309,7 +331,9 @@
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize)
{
- BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false);
+ BasicMessage* msg = new BasicMessage(
+ 0, exchange, routingKey, false, false,
+ DummyChannel::basicGetBody());
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(contentSize);
msg->setHeader(header);
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ExchangeTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ExchangeTest.cpp Fri Feb 2 14:03:10 2007
@@ -26,8 +26,10 @@
#include <TopicExchange.h>
#include <qpid_test_plugin.h>
#include <iostream>
+#include "BasicGetBody.h"
using namespace qpid::broker;
+using namespace qpid::framing;
using namespace qpid::sys;
class ExchangeTest : public CppUnit::TestCase
@@ -54,7 +56,11 @@
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msgPtr(
+ new BasicMessage(
+ 0, "e", "A", true, true,
+ AMQMethodBody::shared_ptr(
+ new BasicGetBody(ProtocolVersion()))));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp Fri Feb 2 14:03:10 2007
@@ -105,7 +105,7 @@
{
std::string a = "hostA";
std::string b = "hostB";
- ConnectionRedirectBody in(version, a, b);
+ ConnectionRedirectBody in(version, 0, a, b);
in.encodeContent(buffer);
buffer.flip();
ConnectionRedirectBody out(version);
@@ -142,7 +142,8 @@
{
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(version, 999, new ConnectionRedirectBody(version, a, b));
+ AMQFrame in(version, 999,
+ new ConnectionRedirectBody(version, 0, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -153,7 +154,7 @@
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, new BasicConsumeOkBody(version, s));
+ AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp Fri Feb 2 14:03:10 2007
@@ -24,6 +24,7 @@
#include <iostream>
#include <list>
#include "AMQFrame.h"
+#include "DummyChannel.h"
using std::list;
using std::string;
@@ -31,13 +32,6 @@
using namespace qpid::broker;
using namespace qpid::framing;
-struct DummyHandler : OutputHandler{
- std::vector<AMQFrame*> frames;
-
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
-};
class InMemoryContentTest : public CppUnit::TestCase
{
@@ -64,12 +58,21 @@
void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5)
{
InMemoryContent content;
- DummyHandler handler;
- u_int16_t channel = 3;
+ DummyChannel channel(3);
addframes(content, inCount, in);
- content.send(highestProtocolVersion, &handler, channel, framesize);
- check(handler, channel, outCount, out);
+ content.send(channel, framesize);
+ CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
+
+ for (unsigned int i = 0; i < outCount; i++) {
+ AMQContentBody::shared_ptr chunk(
+ dynamic_pointer_cast<AMQContentBody>(
+ channel.out.frames[i]->getBody()));
+ CPPUNIT_ASSERT(chunk);
+ CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
+ CPPUNIT_ASSERT_EQUAL(
+ ChannelId(3), channel.out.frames[i]->getChannel());
+ }
}
void addframes(InMemoryContent& content, size_t frameCount, string* frameData)
@@ -80,17 +83,7 @@
}
}
- void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks)
- {
- CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
- for (unsigned int i = 0; i < expectedChunkCount; i++) {
- AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody()));
- CPPUNIT_ASSERT(chunk);
- CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
- CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
- }
- }
};
// Make this test suite a plugin.
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp Fri Feb 2 14:03:10 2007
@@ -26,20 +26,13 @@
#include <list>
#include <sstream>
#include "AMQFrame.h"
-
+#include "DummyChannel.h"
using std::list;
using std::string;
using boost::dynamic_pointer_cast;
using namespace qpid::broker;
using namespace qpid::framing;
-struct DummyHandler : OutputHandler{
- std::vector<AMQFrame*> frames;
-
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
-};
class LazyLoadedContentTest : public CppUnit::TestCase
@@ -99,21 +92,16 @@
{
TestMessageStore store(in);
LazyLoadedContent content(&store, 0, in.size());
- DummyHandler handler;
- u_int16_t channel = 3;
- content.send(highestProtocolVersion, &handler, channel, framesize);
- check(handler, channel, outCount, out);
- }
-
- void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks)
- {
- CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
+ DummyChannel channel(3);
+ content.send(channel, framesize);
+ CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
- for (unsigned int i = 0; i < expectedChunkCount; i++) {
- AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody()));
+ for (unsigned int i = 0; i < outCount; i++) {
+ AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[i]->getBody()));
CPPUNIT_ASSERT(chunk);
- CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
- CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
+ CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
+ CPPUNIT_ASSERT_EQUAL(
+ ChannelId(3), channel.out.frames[i]->getChannel());
}
}
};
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp Fri Feb 2 14:03:10 2007
@@ -26,6 +26,7 @@
#include <qpid_test_plugin.h>
#include <iostream>
#include <memory>
+#include "DummyChannel.h"
using namespace boost;
using namespace qpid::broker;
@@ -36,7 +37,7 @@
{
struct DummyHandler : MessageBuilder::CompletionHandler{
Message::shared_ptr msg;
-
+
virtual void complete(Message::shared_ptr& _msg){
msg = _msg;
}
@@ -48,7 +49,7 @@
Buffer* content;
const u_int32_t contentBufferSize;
- public:
+ public:
void stage(Message* const msg)
{
@@ -98,7 +99,7 @@
}
//dont care about any of the other methods:
- TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(false), header(0), content(0),
+ TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(), header(0), content(0),
contentBufferSize(_contentBufferSize) {}
~TestMessageStore(){}
};
@@ -116,7 +117,10 @@
DummyHandler handler;
MessageBuilder builder(&handler);
- Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(
+ new BasicMessage(
+ 0, "test", "my_routing_key", false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(0);
@@ -133,7 +137,9 @@
string data1("abcdefg");
- Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(
+ new BasicMessage(0, "test", "my_routing_key", false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(7);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -154,7 +160,9 @@
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(
+ new BasicMessage(0, "test", "my_routing_key", false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -183,7 +191,9 @@
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(
+ new BasicMessage(0, "test", "my_routing_key", false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp Fri Feb 2 14:03:10 2007
@@ -23,19 +23,12 @@
#include <iostream>
#include <AMQP_HighestVersion.h>
#include "AMQFrame.h"
+#include "DummyChannel.h"
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
-struct DummyHandler : OutputHandler{
- std::vector<AMQFrame*> frames;
-
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
-};
-
class MessageTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(MessageTest);
@@ -52,7 +45,9 @@
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0, exchange, routingKey, false, false));
+ BasicMessage::shared_ptr msg(
+ new BasicMessage(0, exchange, routingKey, false, false,
+ DummyChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -69,7 +64,8 @@
msg->encode(buffer);
buffer.flip();
- msg = Message::shared_ptr(new BasicMessage(buffer));
+ msg.reset(new BasicMessage());
+ msg->decode(buffer);
CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId());
@@ -77,10 +73,11 @@
CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));
CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
- DummyHandler handler;
- msg->deliver(&handler, 0, "ignore", 0, 100, &(qpid::framing::highestProtocolVersion));
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ DummyChannel channel(1);
+ // FIXME aconway 2007-02-02: deliver should take const ProtocolVersion&
+ msg->deliver(channel, "ignore", 0, 100);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size());
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2]->getBody()));
CPPUNIT_ASSERT(contentBody);
CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp Fri Feb 2 14:03:10 2007
@@ -22,6 +22,7 @@
#include <QueueRegistry.h>
#include <qpid_test_plugin.h>
#include <iostream>
+#include "DummyChannel.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -54,6 +55,12 @@
CPPUNIT_TEST_SUITE_END();
public:
+ Message::shared_ptr message(std::string exchange, std::string routingKey) {
+ return Message::shared_ptr(
+ new BasicMessage(0, exchange, routingKey, true, true,
+ DummyChannel::basicGetBody()));
+ }
+
void testConsumers(){
Queue::shared_ptr queue(new Queue("my_queue", true));
@@ -66,9 +73,9 @@
CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
//Test basic delivery:
- Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true));
+ Message::shared_ptr msg1 = message("e", "A");
+ Message::shared_ptr msg2 = message("e", "B");
+ Message::shared_ptr msg3 = message("e", "C");
queue->deliver(msg1);
CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
@@ -122,10 +129,9 @@
void testDequeue(){
Queue::shared_ptr queue(new Queue("my_queue", true));
-
- Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true));
+ Message::shared_ptr msg1 = message("e", "A");
+ Message::shared_ptr msg2 = message("e", "B");
+ Message::shared_ptr msg3 = message("e", "C");
Message::shared_ptr received;
queue->deliver(msg1);
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp Fri Feb 2 14:03:10 2007
@@ -25,6 +25,7 @@
#include <iostream>
#include <list>
#include <vector>
+#include "DummyChannel.h"
using std::list;
using std::vector;
@@ -44,7 +45,7 @@
dequeued.push_back(std::pair<Message*, const string*>(msg, xid));
}
- TestMessageStore() : NullMessageStore(false) {}
+ TestMessageStore() : NullMessageStore() {}
~TestMessageStore(){}
};
@@ -69,7 +70,9 @@
TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid)
{
for(int i = 0; i < 10; i++){
- Message::shared_ptr msg(new BasicMessage(0, "exchange", "routing_key", false, false));
+ Message::shared_ptr msg(
+ new BasicMessage(0, "exchange", "routing_key", false, false,
+ DummyChannel::basicGetBody()));
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp Fri Feb 2 14:03:10 2007
@@ -25,6 +25,7 @@
#include <iostream>
#include <list>
#include <vector>
+#include "DummyChannel.h"
using std::list;
using std::pair;
@@ -73,10 +74,12 @@
public:
- TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)),
- queue2(new Queue("queue2", false, &store, 0)),
- msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
- op(msg, &xid)
+ TxPublishTest() :
+ queue1(new Queue("queue1", false, &store, 0)),
+ queue2(new Queue("queue2", false, &store, 0)),
+ msg(new BasicMessage(0, "exchange", "routing_key", false, false,
+ DummyChannel::basicGetBody())),
+ op(msg, &xid)
{
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
Modified: incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/AmqpMethod.java?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/AmqpMethod.java (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/AmqpMethod.java Fri Feb 2 14:03:10 2007
@@ -36,7 +36,7 @@
public AmqpFlagMap clientMethodFlagMap; // Method called on client (<chassis name="server"> in XML)
public AmqpFlagMap serverMethodFlagMap; // Method called on server (<chassis name="client"> in XML)
public AmqpFlagMap isResponseFlagMap;
-
+
public AmqpMethod(String name, LanguageConverter converter)
{
this.name = name;
@@ -49,6 +49,10 @@
isResponseFlagMap = new AmqpFlagMap();
}
+ public boolean isResponse(AmqpVersion version) {
+ return (version == null) ? isResponseFlagMap.isSet() : isResponseFlagMap.isSet(version);
+ }
+
/** Check if this method is named as a response by any other method in the class. */
public void checkForResponse(Element methodElement, AmqpVersion version) {
Element clazz = (Element)methodElement.getParentNode();
Modified: incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java Fri Feb 2 14:03:10 2007
@@ -361,8 +361,7 @@
}
private String baseClass(AmqpMethod method, AmqpVersion version) {
- boolean isResponse = (version == null) ? method.isResponseFlagMap.isSet() : method.isResponseFlagMap.isSet(version);
- String base = isResponse ? "AMQResponseBody":"AMQRequestBody";
+ String base = method.isResponse(version) ? "AMQResponseBody":"AMQRequestBody";
return base;
}
@@ -1009,7 +1008,7 @@
sb.append(" // AMQP Version(s) " + versionSet);
sb.append(cr);
sb.append(indent + "{" + cr);
- sb.append(generateMethodBodyCallContext(thisFieldMap, outerclassName, methodBodyClassName,
+ sb.append(generateMethodBodyCallContext(method, thisFieldMap, outerclassName, methodBodyClassName,
versionConsistentFlag, versionSet, indentSize + tabSize, tabSize));
sb.append(indent + "}" + cr);
sb.append(cr);
@@ -1020,7 +1019,7 @@
return sb.toString();
}
- protected String generateMethodBodyCallContext(AmqpOrdinalFieldMap fieldMap, String outerclassName,
+ protected String generateMethodBodyCallContext(AmqpMethod method, AmqpOrdinalFieldMap fieldMap, String outerclassName,
String methodBodyClassName, boolean versionConsistentFlag, AmqpVersionSet versionSet,
int indentSize, int tabSize)
throws AmqpTypeMappingException
@@ -1030,7 +1029,7 @@
StringBuffer sb = new StringBuffer();
if (versionConsistentFlag)
{
- sb.append(generateMethodBodyCall(fieldMap, methodBodyClassName, null, indentSize, tabSize));
+ sb.append(generateMethodBodyCall(method, fieldMap, methodBodyClassName, null, indentSize, tabSize));
}
else
{
@@ -1042,7 +1041,7 @@
sb.append("else ");
sb.append("if (" + generateVersionCheck(thisVersion) + ")" + cr);
sb.append(indent + "{" + cr);
- sb.append(generateMethodBodyCall(fieldMap, methodBodyClassName, thisVersion,
+ sb.append(generateMethodBodyCall(method, fieldMap, methodBodyClassName, thisVersion,
indentSize + tabSize, tabSize));
sb.append(indent + "}" + cr);
firstOverloadedMethodFlag = false;
@@ -1059,7 +1058,7 @@
return sb.toString();
}
- protected String generateMethodBodyCall(AmqpOrdinalFieldMap fieldMap, String methodBodyClassName,
+ protected String generateMethodBodyCall(AmqpMethod method, AmqpOrdinalFieldMap fieldMap, String methodBodyClassName,
AmqpVersion version, int indentSize, int tabSize)
throws AmqpTypeMappingException
{
@@ -1067,9 +1066,12 @@
String tab = Utils.createSpaces(tabSize);
String namespace = version != null ? version.namespace() + "::" : "";
StringBuffer sb = new StringBuffer();
- sb.append(indent + tab + "(new " + namespace + methodBodyClassName + "( parent->getProtocolVersion()");
+ sb.append(indent+tab+"context.channel->send(new ");
+ sb.append(namespace + methodBodyClassName + "( parent->getProtocolVersion()");
+ if (method.isResponse(version))
+ sb.append(", context.methodBody->getRequestId()");
sb.append(generateMethodParameterList(fieldMap, indentSize + (5*tabSize), true, false, true));
- sb.append("))->send(context);\n");
+ sb.append("));\n");
return sb.toString();
}
@@ -1425,14 +1427,22 @@
String indent = Utils.createSpaces(indentSize);
String tab = Utils.createSpaces(tabSize);
StringBuffer sb = new StringBuffer();
- if (method.fieldMap.size() > 0)
+ if (method.fieldMap.size() > 0 || method.isResponse(version))
{
sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(const ProtocolVersion& version," + cr);
- sb.append(generateFieldList(method.fieldMap, version, true, false, 8));
- sb.append(indent + tab + ") :" + cr);
- sb.append(indent + tab + baseClass(method, version) + "(version)," + cr);
- sb.append(generateFieldList(method.fieldMap, version, false, true, 8));
- sb.append(indent + "{ }" + cr);
+ if (method.isResponse(version)) {
+ sb.append(indent+tab+"RequestId toRequest");
+ if (method.fieldMap.size() >0)
+ sb.append(",\n");
+ }
+ sb.append(generateFieldList(method.fieldMap, version, true, false, 8));
+ sb.append(indent + tab + ") : " + baseClass(method, version) + "(version");
+ if (method.isResponse(version))
+ sb.append(", 0, toRequest");
+ sb.append(")");
+ if (method.fieldMap.size() > 0)
+ sb.append(", \n" + generateFieldList(method.fieldMap, version, false, true, 8));
+ sb.append(indent + "{ }\n");
}
return sb.toString();
}
Modified: incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl Fri Feb 2 14:03:10 2007
@@ -30,6 +30,8 @@
#include <AMQP_ClientProxy.h>
#include <AMQFrame.h>
+#include "framing/ChannelAdapter.h"
+
%{MLIST} ${cpc_method_body_include}
namespace qpid {
Modified: incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl Fri Feb 2 14:03:10 2007
@@ -30,6 +30,8 @@
#include <AMQP_ServerProxy.h>
#include <AMQFrame.h>
+#include "framing/ChannelAdapter.h"
+
%{MLIST} ${spc_method_body_include}
namespace qpid {