You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/08/28 21:38:23 UTC
svn commit: r570538 [2/4] - in /incubator/qpid/trunk/qpid:
cpp/rubygen/templates/ cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/client/
cpp/src/qpid/cluster/ cpp/src/qpid/framing/ cpp/src/tests/ python/
python/qpid/ python/tests_0-10/ specs/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Tue Aug 28 12:38:17 2007
@@ -20,7 +20,10 @@
*/
#include "SemanticHandler.h"
+
+#include "boost/format.hpp"
#include "BrokerAdapter.h"
+#include "MessageDelivery.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelCloseOkBody.h"
#include "qpid/framing/ExecutionCompleteBody.h"
@@ -32,18 +35,16 @@
using namespace qpid::sys;
SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
- connection(c),
- channel(c, *this, id, &c.broker.getStore())
+ connection(c), channel(c, *this, id)
{
init(id, connection.getOutput(), connection.getVersion());
adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
}
-
void SemanticHandler::handle(framing::AMQFrame& frame)
{
- //TODO: assembly etc when move to 0-10 framing
- //
+ //TODO: assembly for method and headers
+
//have potentially three separate tracks at this point:
//
// (1) execution controls
@@ -51,46 +52,43 @@
// (3) data i.e. content-bearing commands
//
//framesets on each can be interleaved. framesets on the latter
- //two share a command-id sequence.
+ //two share a command-id sequence. controls on the first track are
+ //used to communicate details about that command-id sequence.
//
//need to decide what to do if a frame on the command track
//arrives while a frameset on the data track is still
//open. execute it (i.e. out-of order execution with respect to
- //the command id sequence) or queue it up.
+ //the command id sequence) or queue it up?
- //if ready to execute (i.e. if segment is complete or frame is
- //message content):
- handleBody(frame.getBody());
-}
-
-//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
-{
- try {
- if (!method->invoke(this)) {
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.lwm);
- }
+ try{
- //else do the usual:
- handleL4(method);
- //(if the frameset is complete) we can move the execution-mark
- //forward
-
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- //TODO: need to account for async store opreations
- //when this command is a message publication
- ++(incoming.hwm);
+ TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
+
+ switch(track) {
+ case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler
+ handleL2(frame.castBody<AMQMethodBody>());
+ break;
+ case EXECUTION_CONTROL_TRACK:
+ handleL3(frame.castBody<AMQMethodBody>());
+ break;
+ case MODEL_COMMAND_TRACK:
+ if (!isOpen()) {
+ throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
}
-
- //note: need to be more sophisticated than this if we execute
- //commands that arrive within an active message frameset (that
- //can't happen until 0-10 framing is implemented)
+ handleCommand(frame.castBody<AMQMethodBody>());
+ break;
+ case MODEL_CONTENT_TRACK:
+ handleContent(frame);
+ break;
}
+
+ }catch(const ChannelException& e){
+ adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
+ connection.closeChannel(getId());
+ }catch(const ConnectionException& e){
+ connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
}catch(const std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame));
}
}
@@ -102,7 +100,6 @@
outgoing.lwm = mark;
//ack messages:
channel.ackCumulative(mark.getValue());
- //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
@@ -116,7 +113,6 @@
void SemanticHandler::flush()
{
//flush doubles as a sync to begin with - send an execution.complete
- incoming.lwm = incoming.hwm;
if (isOpen()) {
Mutex::ScopedLock l(outLock);
ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
@@ -142,52 +138,59 @@
//never actually sent by client at present
}
-void SemanticHandler::handleL4(framing::AMQMethodBody* method)
+void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
- try{
- if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
- }
- } else {
- InvocationVisitor v(adapter.get());
- method->accept(v);
- if (!v.wasHandled()) {
- throw ConnectionException(540, "Not implemented");
- } else if (v.hasResult()) {
- ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
- }
- }
- }catch(const ChannelException& e){
- adapter->getProxy().getChannel().close(
- e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ ++(incoming.lwm);
+ InvocationVisitor v(adapter.get());
+ method->accept(v);
+ //TODO: need to account for async store operations and interleaving
+ ++(incoming.hwm);
+
+ if (!v.wasHandled()) {
+ throw ConnectionException(540, "Not implemented");
+ } else if (v.hasResult()) {
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
}
}
-bool SemanticHandler::isOpen() const
-{
- return channel.isOpen();
+void SemanticHandler::handleL2(framing::AMQMethodBody* method)
+{
+ if(!method->isA<ChannelOpenBody>() && !isOpen()) {
+ if (!method->isA<ChannelCloseOkBody>()) {
+ throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
+ }
+ } else {
+ method->invoke(adapter->getChannelHandler());
+ }
}
-void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body)
+void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
- channel.handleHeader(body);
+ if (!method->invoke(this)) {
+ throw ConnectionException(540, "Not implemented");
+ }
}
-void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body)
+void SemanticHandler::handleContent(AMQFrame& frame)
{
- channel.handleContent(body);
+ Message::shared_ptr msg(msgBuilder.getMessage());
+ if (!msg) {//start of frameset will be indicated by frame flags
+ msgBuilder.start(++(incoming.lwm));
+ msg = msgBuilder.getMessage();
+ }
+ msgBuilder.handle(frame);
+ if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
+ msg->setPublisher(&connection);
+ channel.handle(msg);
+ msgBuilder.end();
+ //TODO: need to account for async store operations and interleaving
+ ++(incoming.hwm);
+ }
}
-void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body)
-{
- channel.handleHeartbeat(body);
+bool SemanticHandler::isOpen() const
+{
+ return channel.isOpen();
}
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
@@ -195,14 +198,13 @@
Mutex::ScopedLock l(outLock);
SequenceNumber copy(outgoing.hwm);
++copy;
- msg->deliver(*this, copy.getValue(), token, connection.getFrameMax());
- //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl;
+ MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax());
return outgoing.hwm.getValue();
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- msg->deliver(*this, tag, token, connection.getFrameMax());
+ MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax());
}
void SemanticHandler::send(const AMQBody& body)
@@ -214,3 +216,49 @@
}
ChannelAdapter::send(body);
}
+
+uint16_t SemanticHandler::getClassId(const AMQFrame& frame)
+{
+ return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0;
+}
+
+uint16_t SemanticHandler::getMethodId(const AMQFrame& frame)
+{
+ return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0;
+}
+
+SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
+{
+ //will be replaced by field in 0-10 frame header
+ uint8_t type = frame.getBody()->type();
+ uint16_t classId;
+ switch(type) {
+ case METHOD_BODY:
+ if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
+ return MODEL_CONTENT_TRACK;
+ }
+
+ classId = frame.castBody<AMQMethodBody>()->amqpClassId();
+ switch (classId) {
+ case ChannelOpenBody::CLASS_ID:
+ return SESSION_CONTROL_TRACK;
+ case ExecutionCompleteBody::CLASS_ID:
+ return EXECUTION_CONTROL_TRACK;
+ }
+
+ return MODEL_COMMAND_TRACK;
+ case HEADER_BODY:
+ case CONTENT_BODY:
+ return MODEL_CONTENT_TRACK;
+ }
+ throw Exception("Could not determine track");
+}
+
+//ChannelAdapter virtual methods, no longer used:
+void SemanticHandler::handleMethod(framing::AMQMethodBody*){}
+
+void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {}
+
+void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {}
+
+void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Tue Aug 28 12:38:17 2007
@@ -25,6 +25,7 @@
#include "BrokerChannel.h"
#include "Connection.h"
#include "DeliveryAdapter.h"
+#include "MessageBuilder.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
@@ -55,8 +56,17 @@
framing::Window incoming;
framing::Window outgoing;
sys::Mutex outLock;
+ MessageBuilder msgBuilder;
- void handleL4(framing::AMQMethodBody* method);
+ enum TrackId {SESSION_CONTROL_TRACK, EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
+ TrackId getTrack(const framing::AMQFrame& frame);
+ uint16_t getClassId(const framing::AMQFrame& frame);
+ uint16_t getMethodId(const framing::AMQFrame& frame);
+
+ void handleL3(framing::AMQMethodBody* method);
+ void handleL2(framing::AMQMethodBody* method);
+ void handleCommand(framing::AMQMethodBody* method);
+ void handleContent(framing::AMQFrame& frame);
//ChannelAdapter virtual methods:
void handleMethod(framing::AMQMethodBody* method);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Tue Aug 28 12:38:17 2007
@@ -25,7 +25,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Tue Aug 28 12:38:17 2007
@@ -24,10 +24,10 @@
#include <algorithm>
#include <functional>
#include <list>
+#include "BrokerQueue.h"
#include "Deliverable.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "MessageStore.h"
-#include "BrokerQueue.h"
#include "TxOp.h"
namespace qpid {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp Tue Aug 28 12:38:17 2007
@@ -75,7 +75,7 @@
id = _id;
setState(OPENING);
- AMQFrame f(version, id, ChannelOpenBody(version));
+ AMQFrame f(id, ChannelOpenBody(version));
out(f);
std::set<int> states;
@@ -90,7 +90,7 @@
void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId));
+ AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId));
out(f);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Tue Aug 28 12:38:17 2007
@@ -181,8 +181,8 @@
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
- ReceivedContent::shared_ptr content = gets.pop();
- content->populate(msg);
+ FrameSet::shared_ptr content = gets.pop();
+ msg.populate(*content);
return true;
}
}
@@ -232,13 +232,13 @@
void Channel::run() {
try {
while (true) {
- ReceivedContent::shared_ptr content = session->get();
+ FrameSet::shared_ptr content = session->get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
if (i != consumers.end()) {
Message msg;
- content->populate(msg);
+ msg.populate(*content);
i->second.listener->received(msg);
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Tue Aug 28 12:38:17 2007
@@ -83,7 +83,7 @@
std::auto_ptr<Session> session;
SessionCore::shared_ptr sessionCore;
framing::ChannelId channelId;
- BlockingQueue<ReceivedContent::shared_ptr> gets;
+ BlockingQueue<framing::FrameSet::shared_ptr> gets;
framing::Uuid uniqueId;
uint32_t nameCounter;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h Tue Aug 28 12:38:17 2007
@@ -23,8 +23,13 @@
*/
#include <string>
#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
namespace qpid {
namespace client {
@@ -54,6 +59,17 @@
void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
const HeaderProperties& getMethodHeaders() const { return *this; }
+
+
+ //TODO: move this elsewhere (GRS 24/08/2007)
+ void populate(framing::FrameSet& frameset)
+ {
+ const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>();
+ if (properties) {
+ BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties);
+ }
+ frameset.getContent(data);
+ }
private:
std::string data;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Aug 28 12:38:17 2007
@@ -109,7 +109,7 @@
void ConnectionHandler::send(const framing::AMQBody& body)
{
- AMQFrame f(ProtocolVersion(), 0, body);
+ AMQFrame f(0, body);
out(f);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Aug 28 12:38:17 2007
@@ -107,7 +107,7 @@
void ConnectionImpl::idleOut()
{
- AMQFrame frame(version, 0, new AMQHeartbeatBody());
+ AMQFrame frame(0, new AMQHeartbeatBody());
connector->send(frame);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Aug 28 12:38:17 2007
@@ -180,7 +180,7 @@
inbuf.move(received);
inbuf.flip();//position = 0, limit = total data read
- AMQFrame frame(version);
+ AMQFrame frame;
while(frame.decode(inbuf)){
QPID_LOG(trace, "RECV: " << frame);
input->received(frame);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Tue Aug 28 12:38:17 2007
@@ -64,9 +64,9 @@
if (!invoke(body, this)) {
if (isContentFrame(frame)) {
if (!arriving) {
- arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm));
+ arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
}
- arriving->append(body);
+ arriving->append(frame);
if (arriving->isComplete()) {
received.push(arriving);
arriving.reset();
@@ -123,7 +123,7 @@
void ExecutionHandler::sendFlush()
{
- AMQFrame frame(version, 0, ExecutionFlushBody());
+ AMQFrame frame(0, ExecutionFlushBody());
out(frame);
}
@@ -139,8 +139,7 @@
correlation.listen(g);
}
- AMQFrame frame(version, 0/*id will be filled in be channel handler*/,
- command);
+ AMQFrame frame(0/*id will be filled in be channel handler*/, command);
out(frame);
}
@@ -149,10 +148,10 @@
{
send(command, f, g);
- AMQHeaderBody header(BASIC);
- BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers);
- header.setContentSize(data.size());
- AMQFrame h(version, 0, header);
+ AMQHeaderBody header;
+ BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
+ header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
+ AMQFrame h(0, header);
out(h);
u_int64_t data_length = data.length();
@@ -160,7 +159,7 @@
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(version, 0, AMQContentBody(data));
+ AMQFrame frame(0, AMQContentBody(data));
out(frame);
}else{
u_int32_t offset = 0;
@@ -168,7 +167,7 @@
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- AMQFrame frame(version, 0, AMQContentBody(frag));
+ AMQFrame frame(0, AMQContentBody(frag));
out(frame);
offset += length;
remaining = data_length - offset;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Tue Aug 28 12:38:17 2007
@@ -23,12 +23,12 @@
#include <queue>
#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/SequenceNumber.h"
#include "BlockingQueue.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
-#include "ReceivedContent.h"
namespace qpid {
namespace client {
@@ -39,7 +39,7 @@
{
framing::Window incoming;
framing::Window outgoing;
- ReceivedContent::shared_ptr arriving;
+ framing::FrameSet::shared_ptr arriving;
Correlator correlation;
CompletionTracker completion;
framing::ProtocolVersion version;
@@ -52,7 +52,7 @@
void sync();
public:
- BlockingQueue<ReceivedContent::shared_ptr> received;
+ BlockingQueue<framing::FrameSet::shared_ptr> received;
ExecutionHandler(uint64_t maxFrameSize = 65536);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Tue Aug 28 12:38:17 2007
@@ -77,7 +77,7 @@
return Response(f);
}
-ReceivedContent::shared_ptr SessionCore::get()
+FrameSet::shared_ptr SessionCore::get()
{
return l3.received.pop();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Tue Aug 28 12:38:17 2007
@@ -25,11 +25,11 @@
#include <boost/shared_ptr.hpp>
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
#include "ChannelHandler.h"
#include "ExecutionHandler.h"
#include "FutureFactory.h"
-#include "ReceivedContent.h"
#include "Response.h"
namespace qpid {
@@ -49,7 +49,7 @@
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
- ReceivedContent::shared_ptr get();
+ framing::FrameSet::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);
bool isSync();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Aug 28 12:38:17 2007
@@ -94,8 +94,7 @@
}
void Cluster::notify() {
- AMQFrame frame(ProtocolVersion(), 0,
- ClusterNotifyBody(ProtocolVersion(), url));
+ AMQFrame frame(0, ClusterNotifyBody(ProtocolVersion(), url));
handle(frame);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp Tue Aug 28 12:38:17 2007
@@ -51,7 +51,7 @@
//
BrokerHandler(Broker& broker) :
connection(0, broker),
- channel(connection, *this, 1, 0),
+ channel(connection, *this, 1),
adapter(channel, connection, broker, *this) {}
void handle(AMQFrame& frame) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.h Tue Aug 28 12:38:17 2007
@@ -38,6 +38,7 @@
inline virtual ~AMQContentBody(){}
inline uint8_t type() const { return CONTENT_BODY; };
inline const string& getData() const { return data; }
+ inline string& getData() { return data; }
uint32_t size() const;
void encode(Buffer& buffer) const;
void decode(Buffer& buffer, uint32_t size);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQDataBlock.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQDataBlock.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQDataBlock.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQDataBlock.h Tue Aug 28 12:38:17 2007
@@ -30,7 +30,7 @@
{
public:
virtual ~AMQDataBlock() {}
- virtual void encode(Buffer& buffer) = 0;
+ virtual void encode(Buffer& buffer) const = 0;
virtual bool decode(Buffer& buffer) = 0;
virtual uint32_t size() const = 0;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Aug 28 12:38:17 2007
@@ -70,7 +70,7 @@
return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
}
-void AMQFrame::encode(Buffer& buffer)
+void AMQFrame::encode(Buffer& buffer) const
{
buffer.putOctet(getBody()->type());
buffer.putShort(channel);
@@ -80,8 +80,11 @@
}
uint32_t AMQFrame::size() const{
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ +
- boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/;
+ return frameOverhead() + boost::apply_visitor(SizeVisitor(), body);
+}
+
+uint32_t AMQFrame::frameOverhead() {
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/;
}
bool AMQFrame::decode(Buffer& buffer)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Aug 28 12:38:17 2007
@@ -37,14 +37,14 @@
class AMQFrame : public AMQDataBlock
{
public:
- AMQFrame(ProtocolVersion=ProtocolVersion()) {}
+ AMQFrame() : channel(0) {}
/** Construct a frame with a copy of b */
- AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) {
+ AMQFrame(ChannelId c, const AMQBody* b) : channel(c) {
setBody(*b);
}
- AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) {
+ AMQFrame(ChannelId c, const AMQBody& b) : channel(c) {
setBody(b);
}
@@ -52,21 +52,26 @@
void setChannel(ChannelId c) { channel = c; }
AMQBody* getBody();
- const AMQBody* getBody() const;
+ const AMQBody* getBody() const;
/** Copy a body instance to the frame */
void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); }
/** Convenience template to cast the body to an expected type. */
template <class T> T* castBody() {
- boost::polymorphic_downcast<T*>(getBody());
+ return boost::polymorphic_downcast<T*>(getBody());
+ }
+
+ template <class T> const T* castBody() const {
+ return boost::polymorphic_downcast<const T*>(getBody());
}
bool empty() { return boost::get<boost::blank>(&body); }
- void encode(Buffer& buffer);
+ void encode(Buffer& buffer) const;
bool decode(Buffer& buffer);
uint32_t size() const;
+ static uint32_t frameOverhead();
private:
struct CopyVisitor : public AMQBodyConstVisitor {
@@ -77,7 +82,7 @@
void visit(const AMQHeartbeatBody& x) { frame.body=x; }
void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); }
};
- friend struct CopyVisitor;
+ friend struct CopyVisitor;
typedef boost::variant<boost::blank,
AMQHeaderBody,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp Tue Aug 28 12:38:17 2007
@@ -19,37 +19,65 @@
*
*/
#include "AMQHeaderBody.h"
-#include "qpid/QpidError.h"
-#include "BasicHeaderProperties.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
-qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {}
+qpid::framing::AMQHeaderBody::AMQHeaderBody() {}
-qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){}
-
-qpid::framing::AMQHeaderBody::~AMQHeaderBody(){}
+qpid::framing::AMQHeaderBody::~AMQHeaderBody() {}
uint32_t qpid::framing::AMQHeaderBody::size() const{
- return 12 + properties.size();
+ CalculateSize visitor;
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+ return visitor.totalSize() + (properties.size() * (2/*type codes*/ + 4/*size*/));
}
void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const {
- buffer.putShort(properties.classId());
- buffer.putShort(weight);
- buffer.putLongLong(contentSize);
- properties.encode(buffer);
+ Encode visitor(buffer);
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+}
+
+void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t size){
+ uint32_t limit = buffer.available() - size;
+ while (buffer.available() > limit + 2) {
+ uint32_t len = buffer.getLong();
+ uint16_t type = buffer.getShort();
+ //The following switch could be generated as the number of options increases:
+ switch(type) {
+ case BasicHeaderProperties::TYPE:
+ decode(BasicHeaderProperties(), buffer, len - 2);
+ break;
+ case MessageProperties::TYPE:
+ decode(MessageProperties(), buffer, len - 2);
+ break;
+ case DeliveryProperties::TYPE:
+ decode(DeliveryProperties(), buffer, len - 2);
+ break;
+ default:
+ //TODO: should just skip over them keeping them for later dispatch as is
+ throw Exception(QPID_MSG("Unexpected property type: " << type));
+ }
+ }
}
-void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){
- buffer.getShort(); // Ignore classId
- weight = buffer.getShort();
- contentSize = buffer.getLongLong();
- properties.decode(buffer, bufSize - 12);
+uint64_t qpid::framing::AMQHeaderBody::getContentLength() const
+{
+ const MessageProperties* mProps = get<MessageProperties>();
+ if (mProps) {
+ return mProps->getContentLength();
+ }
+ const BasicHeaderProperties* bProps = get<BasicHeaderProperties>();
+ if (bProps) {
+ return bProps->getContentLength();
+ }
+ return 0;
}
void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
{
- out << "header (" << size() << " bytes)" << " content_size=" << getContentSize();
- out << ", message_id=" << properties.getMessageId();
- out << ", delivery_mode=" << (int) properties.getDeliveryMode();
- out << ", headers=" << properties.getHeaders();
+ out << "header (" << size() << " bytes)";
+ out << "; properties={";
+ Print visitor(out);
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+ out << "}";
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQHeaderBody.h Tue Aug 28 12:38:17 2007
@@ -22,6 +22,12 @@
#include "AMQBody.h"
#include "Buffer.h"
#include "BasicHeaderProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include <iostream>
+#include <vector>
+#include <boost/variant.hpp>
+#include <boost/variant/get.hpp>
#ifndef _AMQHeaderBody_
#define _AMQHeaderBody_
@@ -31,24 +37,85 @@
class AMQHeaderBody : public AMQBody
{
- BasicHeaderProperties properties;
- uint16_t weight;
- uint64_t contentSize;
- public:
- AMQHeaderBody(int classId);
+ typedef std::vector< boost::variant<BasicHeaderProperties, DeliveryProperties, MessageProperties> > PropertyList;
+
+ PropertyList properties;
+
+ template <class T> void decode(T t, Buffer& b, uint32_t size) {
+ t.decode(b, size);
+ properties.push_back(t);
+ }
+
+ class Encode : public boost::static_visitor<> {
+ Buffer& buffer;
+ public:
+ Encode(Buffer& b) : buffer(b) {}
+
+ template <class T> void operator()(T& t) const {
+ buffer.putLong(t.size() + 2/*typecode*/);
+ buffer.putShort(T::TYPE);
+ t.encode(buffer);
+ }
+ };
+
+ class CalculateSize : public boost::static_visitor<> {
+ uint32_t size;
+ public:
+ CalculateSize() : size(0) {}
+
+ template <class T> void operator()(T& t) {
+ size += t.size();
+ }
+
+ uint32_t totalSize() {
+ return size;
+ }
+ };
+
+ class Print : public boost::static_visitor<> {
+ std::ostream& out;
+ public:
+ Print(std::ostream& o) : out(o) {}
+
+ template <class T> void operator()(T& t) {
+ out << t;
+ }
+ };
+
+public:
+
AMQHeaderBody();
+ ~AMQHeaderBody();
inline uint8_t type() const { return HEADER_BODY; }
- BasicHeaderProperties* getProperties(){ return &properties; }
- const BasicHeaderProperties* getProperties() const { return &properties; }
- inline uint64_t getContentSize() const { return contentSize; }
- inline void setContentSize(uint64_t _size) { contentSize = _size; }
- virtual ~AMQHeaderBody();
- virtual uint32_t size() const;
- virtual void encode(Buffer& buffer) const;
- virtual void decode(Buffer& buffer, uint32_t size);
- virtual void print(std::ostream& out) const;
+
+ uint32_t size() const;
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer, uint32_t size);
+ uint64_t getContentLength() const;
+ void print(std::ostream& out) const;
void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+
+ template <class T> T* get(bool create) {
+ for (PropertyList::iterator i = properties.begin(); i != properties.end(); i++) {
+ T* p = boost::get<T>(&(*i));
+ if (p) return p;
+ }
+ if (create) {
+ properties.push_back(T());
+ return boost::get<T>(&(properties.back()));
+ } else {
+ return 0;
+ }
+ }
+
+ template <class T> const T* get() const {
+ for (PropertyList::const_iterator i = properties.begin(); i != properties.end(); i++) {
+ const T* p = boost::get<T>(&(*i));
+ if (p) return p;
+ }
+ return 0;
+ }
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h Tue Aug 28 12:38:17 2007
@@ -49,6 +49,7 @@
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
+ virtual bool isContentBearing() const = 0;
void invoke(AMQP_ServerOperations&);
bool invoke(Invocable*);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp Tue Aug 28 12:38:17 2007
@@ -22,7 +22,10 @@
//TODO: This could be easily generated from the spec
-qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){}
+qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)),
+ priority(0),
+ timestamp(0),
+ contentLength(0){}
qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){}
uint32_t qpid::framing::BasicHeaderProperties::size() const{
@@ -41,6 +44,7 @@
if(userId.length() > 0) bytes += userId.length() + 1;
if(appId.length() > 0) bytes += appId.length() + 1;
if(clusterId.length() > 0) bytes += clusterId.length() + 1;
+ if(contentLength != 0) bytes += 8;
return bytes;
}
@@ -63,6 +67,7 @@
if(userId.length() > 0) buffer.putShortString(userId);
if(appId.length() > 0) buffer.putShortString(appId);
if(clusterId.length() > 0) buffer.putShortString(clusterId);
+ if(contentLength != 0) buffer.putLongLong(contentLength);
}
void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, uint32_t /*size*/){
@@ -81,6 +86,7 @@
if(flags & (1 << 4)) buffer.getShortString(userId);
if(flags & (1 << 3)) buffer.getShortString(appId);
if(flags & (1 << 2)) buffer.getShortString(clusterId);
+ if(flags & (1 << 1)) contentLength = buffer.getLongLong();
}
uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{
@@ -99,5 +105,32 @@
if(userId.length() > 0) flags |= (1 << 4);
if(appId.length() > 0) flags |= (1 << 3);
if(clusterId.length() > 0) flags |= (1 << 2);
+ if(contentLength != 0) flags |= (1 << 1);
return flags;
}
+
+namespace qpid{
+namespace framing{
+
+ std::ostream& operator<<(std::ostream& out, const BasicHeaderProperties& props)
+ {
+ if(props.contentType.length() > 0) out << "contentType=" << props.contentType << ";";
+ if(props.contentEncoding.length() > 0) out << "contentEncoding=" << props.contentEncoding << ";";
+ if(props.headers.count() > 0) out << "headers=" << props.headers << ";";
+ if(props.deliveryMode != 0) out << "deliveryMode=" << props.deliveryMode << ";";
+ if(props.priority != 0) out << "priority=" << props.priority << ";";
+ if(props.correlationId.length() > 0) out << "correlationId=" << props.correlationId << ";";
+ if(props.replyTo.length() > 0) out << "replyTo=" << props.replyTo << ";";
+ if(props.expiration.length() > 0) out << "expiration=" << props.expiration << ";";
+ if(props.messageId.length() > 0) out << "messageId=" << props.messageId << ";";
+ if(props.timestamp != 0) out << "timestamp=" << props.timestamp << ";";
+ if(props.type.length() > 0) out << "type=" << props.type << ";";
+ if(props.userId.length() > 0) out << "userId=" << props.userId << ";";
+ if(props.appId.length() > 0) out << "appId=" << props.appId << ";";
+ if(props.clusterId.length() > 0) out << "clusterId=" << props.clusterId << ";";
+ if(props.contentLength != 0) out << "contentLength=" << props.contentLength << ";";
+
+ return out;
+ }
+
+}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h Tue Aug 28 12:38:17 2007
@@ -47,15 +47,18 @@
string userId;
string appId;
string clusterId;
+ uint64_t contentLength;
uint16_t getFlags() const;
public:
+ static const uint16_t TYPE = BASIC;
+
BasicHeaderProperties();
virtual ~BasicHeaderProperties();
virtual uint32_t size() const;
virtual void encode(Buffer& buffer) const;
- virtual void decode(Buffer& buffer, uint32_t size);
+ virtual void decode(Buffer& buffer, uint32_t size = 0);
virtual uint8_t classId() const { return BASIC; }
@@ -74,6 +77,7 @@
string getUserId() const { return userId; }
string getAppId() const { return appId; }
string getClusterId() const { return clusterId; }
+ uint64_t getContentLength() const { return contentLength; }
void setContentType(const string& _type){ contentType = _type; }
void setContentEncoding(const string& encoding){ contentEncoding = encoding; }
@@ -89,6 +93,9 @@
void setUserId(const string& _userId){ userId = _userId; }
void setAppId(const string& _appId){appId = _appId; }
void setClusterId(const string& _clusterId){ clusterId = _clusterId; }
+ void setContentLength(uint64_t _contentLength){ contentLength = _contentLength; }
+
+ friend std::ostream& operator<<(std::ostream&, const BasicHeaderProperties&);
/** \internal
* Template to copy between types like BasicHeaderProperties.
@@ -109,6 +116,7 @@
to.setUserId(from.getUserId());
to.setAppId(from.getAppId());
to.setClusterId(from.getClusterId());
+ to.setContentLength(from.getContentLength());
}
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp Tue Aug 28 12:38:17 2007
@@ -51,7 +51,7 @@
void ChannelAdapter::send(const AMQBody& body)
{
assertChannelOpen();
- AMQFrame frame(getVersion(), getId(), body);
+ AMQFrame frame(getId(), body);
handlers.out->handle(frame);
}
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (from r569281, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp&r1=569281&r2=570538&rev=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Tue Aug 28 12:38:17 2007
@@ -19,87 +19,65 @@
*
*/
-#include "ReceivedContent.h"
+#include "FrameSet.h"
#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/TypeFilter.h"
-using qpid::client::ReceivedContent;
using namespace qpid::framing;
using namespace boost;
-ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
+FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {}
-void ReceivedContent::append(AMQBody* part)
+void FrameSet::append(AMQFrame& part)
{
- parts.push_back(AMQFrame(ProtocolVersion(), 0, part));
+ parts.push_back(part);
}
-bool ReceivedContent::isComplete() const
+bool FrameSet::isComplete() const
{
- if (parts.empty()) {
- return false;
- } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- const AMQHeaderBody* headers(getHeaders());
- return headers && headers->getContentSize() == getContentSize();
- } else if (isA<MessageTransferBody>()) {
- //no longer support references, headers and data are still method fields
- return true;
+ //TODO: should eventually use the 0-10 frame header flags when available
+ const AMQMethodBody* method = getMethod();
+ if (!method) {
+ return false;
+ } else if (method->isContentBearing()) {
+ const AMQHeaderBody* header = getHeaders();
+ if (header) {
+ return header->getContentLength() == getContentSize();
+ } else {
+ return false;
+ }
} else {
- throw Exception("Unknown content class");
+ return true;
}
}
-
-const AMQMethodBody* ReceivedContent::getMethod() const
+const AMQMethodBody* FrameSet::getMethod() const
{
return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
}
-const AMQHeaderBody* ReceivedContent::getHeaders() const
+const AMQHeaderBody* FrameSet::getHeaders() const
{
return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
}
-uint64_t ReceivedContent::getContentSize() const
+AMQHeaderBody* FrameSet::getHeaders()
{
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- uint64_t size(0);
- for (uint i = 2; i < parts.size(); i++) {
- size += parts[i].getBody()->size();
- }
- return size;
- } else if (isA<MessageTransferBody>()) {
- return as<MessageTransferBody>()->getBody().getValue().size();
- } else {
- throw Exception("Unknown content class");
- }
+ return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody());
}
-std::string ReceivedContent::getContent() const
+uint64_t FrameSet::getContentSize() const
{
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- string data;
- for (uint i = 2; i < parts.size(); i++) {
- data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData();
- }
- return data;
- } else if (isA<MessageTransferBody>()) {
- return as<MessageTransferBody>()->getBody().getValue();
- } else {
- throw Exception("Unknown content class");
- }
+ SumBodySize sum;
+ map_if(sum, TypeFilter(CONTENT_BODY));
+ return sum.getSize();
}
-void ReceivedContent::populate(Message& msg)
+void FrameSet::getContent(std::string& out) const
{
- if (!isComplete()) throw Exception("Incomplete message");
-
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties());
- BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
- msg.setData(getContent());
- } else if (isA<MessageTransferBody>()) {
- throw Exception("Transfer not yet supported");
- } else {
- throw Exception("Unknown content class");
- }
+ AccumulateContent accumulator(out);
+ map_if(accumulator, TypeFilter(CONTENT_BODY));
}
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (from r569281, incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h&r1=569281&r2=570538&rev=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Tue Aug 28 12:38:17 2007
@@ -23,49 +23,76 @@
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/SequenceNumber.h"
-#include "ClientMessage.h"
-#ifndef _ReceivedContent_
-#define _ReceivedContent_
+#ifndef _FrameSet_
+#define _FrameSet_
namespace qpid {
-namespace client {
+namespace framing {
/**
- * Collects the frames representing some received 'content'. This
- * provides a raw interface to 'message' data and attributes.
+ * Collects the frames representing a message.
*/
-class ReceivedContent
+class FrameSet
{
- const framing::SequenceNumber id;
- std::vector<framing::AMQFrame> parts;
+ typedef std::vector<AMQFrame> Frames;
+ const SequenceNumber id;
+ Frames parts;
public:
- typedef boost::shared_ptr<ReceivedContent> shared_ptr;
+ typedef boost::shared_ptr<FrameSet> shared_ptr;
- ReceivedContent(const framing::SequenceNumber& id);
- void append(framing::AMQBody* part);
+ FrameSet(const SequenceNumber& id);
+ void append(AMQFrame& part);
bool isComplete() const;
uint64_t getContentSize() const;
- std::string getContent() const;
+ void getContent(std::string&) const;
- const framing::AMQMethodBody* getMethod() const;
- const framing::AMQHeaderBody* getHeaders() const;
+ const AMQMethodBody* getMethod() const;
+ const AMQHeaderBody* getHeaders() const;
+ AMQHeaderBody* getHeaders();
template <class T> bool isA() const {
- const framing::AMQMethodBody* method=getMethod();
+ const AMQMethodBody* method = getMethod();
return method && method->isA<T>();
}
template <class T> const T* as() const {
- const framing::AMQMethodBody* method=getMethod();
+ const AMQMethodBody* method = getMethod();
return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
}
- const framing::SequenceNumber& getId() const { return id; }
+ template <class T> const T* getHeaderProperties() const {
+ const AMQHeaderBody* header = getHeaders();
+ return header ? header->get<T>() : 0;
+ }
+
+ const SequenceNumber& getId() const { return id; }
+
+ template <class P> void remove(P predicate) {
+ parts.erase(remove_if(parts.begin(), parts.end(), predicate), parts.end());
+ }
+
+ template <class F> void map(F& functor) {
+ for_each(parts.begin(), parts.end(), functor);
+ }
- void populate(Message& msg);
+ template <class F> void map(F& functor) const {
+ for_each(parts.begin(), parts.end(), functor);
+ }
+
+ template <class F, class P> void map_if(F& functor, P predicate) {
+ for(Frames::iterator i = parts.begin(); i != parts.end(); i++) {
+ if (predicate(*i)) functor(*i);
+ }
+ }
+
+ template <class F, class P> void map_if(F& functor, P predicate) const {
+ for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) {
+ if (predicate(*i)) functor(*i);
+ }
+ }
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp Tue Aug 28 12:38:17 2007
@@ -31,7 +31,7 @@
ProtocolInitiation::~ProtocolInitiation(){}
-void ProtocolInitiation::encode(Buffer& buffer){
+void ProtocolInitiation::encode(Buffer& buffer) const {
buffer.putOctet('A');
buffer.putOctet('M');
buffer.putOctet('Q');
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h Tue Aug 28 12:38:17 2007
@@ -39,7 +39,7 @@
ProtocolInitiation(uint8_t major, uint8_t minor);
ProtocolInitiation(ProtocolVersion p);
virtual ~ProtocolInitiation();
- virtual void encode(Buffer& buffer);
+ virtual void encode(Buffer& buffer) const;
virtual bool decode(Buffer& buffer);
inline virtual uint32_t size() const { return 8; }
inline uint8_t getMajor() const { return version.getMajor(); }
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp?rev=570538&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp Tue Aug 28 12:38:17 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SendContent.h"
+
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {}
+
+void qpid::framing::SendContent::operator()(AMQFrame& f) const
+{
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ const AMQContentBody* body(f.castBody<AMQContentBody>());
+ if (body->size() > maxContentSize) {
+ uint32_t offset = 0;
+ for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) {
+ sendFragment(*body, offset, maxContentSize);
+ offset += maxContentSize;
+ }
+ uint32_t remainder = body->size() % maxContentSize;
+ if (remainder) {
+ sendFragment(*body, offset, remainder);
+ }
+ } else {
+ AMQFrame copy(f);
+ copy.setChannel(channel);
+ handler.handle(copy);
+ }
+}
+
+void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const
+{
+ AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+ handler.handle(fragment);
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h?rev=570538&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h Tue Aug 28 12:38:17 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
+
+#ifndef _SendContent_
+#define _SendContent_
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Functor that sends frame to handler, refragmenting if
+ * necessary. Currently only works on content frames but this could be
+ * changed once we support multi-frame segments in general.
+ */
+class SendContent
+{
+ mutable FrameHandler& handler;
+ const uint16_t channel;
+ const uint16_t maxFrameSize;
+
+ void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const;
+public:
+ SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize);
+ void operator()(AMQFrame& f) const;
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h?rev=570538&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h Tue Aug 28 12:38:17 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
+
+#ifndef _TypeFilter_
+#define _TypeFilter_
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Predicate that selects frames by type
+ */
+class TypeFilter
+{
+ std::vector<uint8_t> types;
+public:
+ TypeFilter(uint8_t type) { add(type); }
+ TypeFilter(uint8_t type1, uint8_t type2) { add(type1); add(type2); }
+ void add(uint8_t type) { types.push_back(type); }
+ bool operator()(const AMQFrame& f) const
+ {
+ return find(types.begin(), types.end(), f.getBody()->type()) != types.end();
+ }
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TypeFilter.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=570538&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Tue Aug 28 12:38:17 2007
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <ostream>
+#include <iostream>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+
+#ifndef _frame_functors_
+#define _frame_functors_
+
+namespace qpid {
+namespace framing {
+
+class SumFrameSize
+{
+ uint64_t size;
+public:
+ SumFrameSize() : size(0) {}
+ void operator()(const AMQFrame& f) { size += f.size(); }
+ uint64_t getSize() { return size; }
+};
+
+class SumBodySize
+{
+ uint64_t size;
+public:
+ SumBodySize() : size(0) {}
+ void operator()(const AMQFrame& f) { size += f.getBody()->size(); }
+ uint64_t getSize() { return size; }
+};
+
+class EncodeFrame
+{
+ Buffer& buffer;
+public:
+ EncodeFrame(Buffer& b) : buffer(b) {}
+ void operator()(const AMQFrame& f) { f.encode(buffer); }
+};
+
+class EncodeBody
+{
+ Buffer& buffer;
+public:
+ EncodeBody(Buffer& b) : buffer(b) {}
+ void operator()(const AMQFrame& f) { f.getBody()->encode(buffer); }
+};
+
+class AccumulateContent
+{
+ std::string& content;
+public:
+ AccumulateContent(std::string& c) : content(c) {}
+ void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); }
+};
+
+class Relay
+{
+ FrameHandler& handler;
+ const uint16_t channel;
+
+public:
+ Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {}
+
+ void operator()(AMQFrame& f)
+ {
+ AMQFrame copy(f);
+ copy.setChannel(channel);
+ handler.handle(copy);
+ }
+};
+
+class Print
+{
+ std::ostream& out;
+public:
+ Print(std::ostream& o) : out(o) {}
+
+ void operator()(const AMQFrame& f)
+ {
+ out << f << std::endl;
+ }
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Tue Aug 28 12:38:17 2007
@@ -19,12 +19,14 @@
*
*/
#include "qpid/broker/BrokerChannel.h"
-#include "qpid/broker/BrokerMessage.h"
#include "qpid/broker/BrokerQueue.h"
#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageDelivery.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid_test_plugin.h"
#include <iostream>
+#include <sstream>
#include <memory>
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQFrame.h"
@@ -72,7 +74,6 @@
CPPUNIT_TEST_SUITE(BrokerChannelTest);
CPPUNIT_TEST(testConsumerMgmt);;
CPPUNIT_TEST(testDeliveryNoAck);
- CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST(testFlow);
CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue);
@@ -155,7 +156,16 @@
void check()
{
- CPPUNIT_ASSERT(expected.empty());
+ if (!expected.empty()) {
+ std::stringstream error;
+ error << "Expected: ";
+ while (!expected.empty()) {
+ MethodCall& m = expected.front();
+ error << m.name << "(" << m.msg << ", '" << m.data << "'); ";
+ expected.pop();
+ }
+ CPPUNIT_FAIL(error.str());
+ }
}
};
@@ -173,7 +183,7 @@
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, recorder, 0, 0);
+ Channel channel(connection, recorder, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
@@ -203,7 +213,7 @@
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
channel.consume(token, tag, queue, false, false, 0);
queue->deliver(msg);
sleep(2);
@@ -213,48 +223,6 @@
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- void testStaging(){
- MockMessageStore store;
- connection.setFrameMax(1000);
- connection.setStagingThreshold(10);
- Channel channel(connection, recorder, 1, &store);
- const string data[] = {"abcde", "fghij", "klmno"};
-
- Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
-
- store.expect();
- store.stage(*msg);
- for (int i = 0; i < 3; i++) {
- store.appendContent(*msg, data[i]);
- }
- store.destroy(*msg);
- store.test();
-
- Exchange::shared_ptr exchange =
- broker->getExchanges().declare("my_exchange", "fanout").first;
- Queue::shared_ptr queue(new Queue("my_queue"));
- exchange->bind(queue, "", 0);
-
- AMQHeaderBody header(BASIC);
- uint64_t contentSize(0);
- for (int i = 0; i < 3; i++) {
- contentSize += data[i].size();
- }
- header.setContentSize(contentSize);
- channel.handlePublish(msg);
- channel.handleHeader(&header);
-
- for (int i = 0; i < 3; i++) {
- AMQContentBody body(data[i]);
- channel.handleContent(&body);
- }
- Message::shared_ptr msg2 = queue->dequeue();
- CPPUNIT_ASSERT_EQUAL(msg, msg2.get());
- msg2.reset();//should trigger destroy call
-
- store.check();
- }
-
//NOTE: strictly speaking this should/could be part of QueueTest,
//but as it can usefully use the same utility classes as this
@@ -279,7 +247,6 @@
store.expect();
store.stage(*msg3);
- store.destroy(*msg3);
store.test();
Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));
@@ -348,16 +315,17 @@
CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody()));
- const string data("abcdefghijklmn");
-
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
channel.consume(token, tag, queue, false, false, 0);
channel.flow(false);
+
+ //'publish' a message
+ Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ addContent(msg, "abcdefghijklmn");
queue->deliver(msg);
+
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
@@ -369,21 +337,26 @@
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
+ Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
{
- BasicMessage* msg = new BasicMessage(
- 0, exchange, routingKey, false, false);
- AMQHeaderBody header(BASIC);
- header.setContentSize(contentSize);
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(messageId);
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ props->setMessageId(messageId);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
return msg;
}
void addContent(Message::shared_ptr msg, const string& data)
{
- AMQContentBody body(data);
- msg->addContent(&body);
+ AMQFrame content(0, AMQContentBody(data));
+ msg->getFrames().append(content);
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Tue Aug 28 12:38:17 2007
@@ -34,7 +34,7 @@
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame send(VER, 1, SessionOpenBody(VER));
+ AMQFrame send(1, SessionOpenBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -60,7 +60,7 @@
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame send(VER, 1, SessionOpenBody(VER));
+ AMQFrame send(1, SessionOpenBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -91,8 +91,8 @@
/** Test the ClassifierHandler */
BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) {
- AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER));
- AMQFrame messageTrans(VER, 0, MessageTransferBody(VER));
+ AMQFrame queueDecl(0, QueueDeclareBody(VER));
+ AMQFrame messageTrans(0, MessageTransferBody(VER));
shared_ptr<CountHandler> wiring(new CountHandler());
shared_ptr<CountHandler> other(new CountHandler());
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp Tue Aug 28 12:38:17 2007
@@ -40,7 +40,7 @@
BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
- AMQFrame send(VER, 1, SessionAttachedBody(VER));
+ AMQFrame send(1, SessionAttachedBody(VER));
cluster.handle(send);
BOOST_REQUIRE(cluster.received.waitPop(frame));
BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *frame.getBody());
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Tue Aug 28 12:38:17 2007
@@ -31,6 +31,7 @@
#include "qpid_test_plugin.h"
#include <iostream>
#include "qpid/framing/BasicGetBody.h"
+#include "MessageUtils.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -63,7 +64,7 @@
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msgPtr(MessageUtils::createMessage("exchange", "key", "id"));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp Tue Aug 28 12:38:17 2007
@@ -137,8 +137,7 @@
{
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(version, 999,
- ConnectionRedirectBody(version, a, b));
+ AMQFrame in(999, ConnectionRedirectBody(version, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -149,7 +148,7 @@
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, BasicConsumeOkBody(version, s));
+ AMQFrame in(999, BasicConsumeOkBody(version, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/HeaderTest.cpp Tue Aug 28 12:38:17 2007
@@ -36,8 +36,8 @@
void testGenericProperties()
{
- AMQHeaderBody body(BASIC);
- dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE");
+ AMQHeaderBody body;
+ body.get<BasicHeaderProperties>(true)->getHeaders().setString("A", "BCDE");
Buffer buffer(100);
body.encode(buffer);
@@ -45,7 +45,7 @@
AMQHeaderBody body2;
body2.decode(buffer, body.size());
BasicHeaderProperties* props =
- dynamic_cast<BasicHeaderProperties*>(body2.getProperties());
+ body2.get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"),
props->getHeaders().getString("A"));
}
@@ -64,10 +64,11 @@
string userId("guest");
string appId("just testing");
string clusterId("no clustering required");
+ uint64_t contentLength(54321);
- AMQHeaderBody body(BASIC);
+ AMQFrame out(0, AMQHeaderBody());
BasicHeaderProperties* properties =
- dynamic_cast<BasicHeaderProperties*>(body.getProperties());
+ out.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
properties->setContentType(contentType);
properties->getHeaders().setString("A", "BCDE");
properties->setDeliveryMode(deliveryMode);
@@ -81,13 +82,14 @@
properties->setUserId(userId);
properties->setAppId(appId);
properties->setClusterId(clusterId);
+ properties->setContentLength(contentLength);
Buffer buffer(10000);
- body.encode(buffer);
+ out.encode(buffer);
buffer.flip();
- AMQHeaderBody temp;
- temp.decode(buffer, body.size());
- properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties());
+ AMQFrame in;
+ in.decode(buffer);
+ properties = in.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A"));
@@ -102,6 +104,7 @@
CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId());
CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId());
CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId());
+ CPPUNIT_ASSERT_EQUAL(contentLength, properties->getContentLength());
}
void testSomeSpecificProperties(){
@@ -111,9 +114,9 @@
string expiration("Z");
uint64_t timestamp(0xabe4a34a);
- AMQHeaderBody body(BASIC);
+ AMQHeaderBody body;
BasicHeaderProperties* properties =
- dynamic_cast<BasicHeaderProperties*>(body.getProperties());
+ body.get<BasicHeaderProperties>(true);
properties->setContentType(contentType);
properties->setDeliveryMode(deliveryMode);
properties->setPriority(priority);
@@ -125,7 +128,7 @@
buffer.flip();
AMQHeaderBody temp;
temp.decode(buffer, body.size());
- properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties());
+ properties = temp.get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode());
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=570538&r1=570537&r2=570538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Aug 28 12:38:17 2007
@@ -82,11 +82,8 @@
DtxWorkRecordTest \
ExchangeTest \
HeadersExchangeTest \
- InMemoryContentTest \
- LazyLoadedContentTest \
MessageBuilderTest \
MessageTest \
- ReferenceTest \
QueueRegistryTest \
QueueTest \
QueuePolicyTest \
@@ -142,6 +139,7 @@
.valgrind.supp-default \
.valgrindrc-default \
InProcessBroker.h \
+ MessageUtils.h \
MockChannel.h \
MockConnectionInputHandler.h \
TxMocks.h \