You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2007/01/09 20:44:51 UTC
svn commit: r494540 - in /incubator/qpid/trunk/qpid/cpp: lib/broker/
lib/client/ lib/common/framing/ tests/
Author: cctrieloff
Date: Tue Jan 9 11:44:50 2007
New Revision: 494540
URL: http://svn.apache.org/viewvc?view=rev&rev=494540
Log:
Most of remaining version changes for C++. Still need to deal with AMQFrame
defualt constructor and do some clean up here and there..
Modified:
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h
incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h
incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h
incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp
incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp
incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h
incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.cpp
incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.h
incubator/qpid/trunk/qpid/cpp/tests/BodyHandlerTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/FramingTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/InMemoryContentTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp Tue Jan 9 11:44:50 2007
@@ -80,8 +80,8 @@
u_int32_t framesize,
ProtocolVersion* version){
// CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey)));
- sendContent(out, channel, framesize);
+ out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ sendContent(out, channel, framesize, version);
}
void Message::sendGetOk(OutputHandler* out,
@@ -91,16 +91,16 @@
u_int32_t framesize,
ProtocolVersion* version){
// CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount)));
- sendContent(out, channel, framesize);
+ out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ sendContent(out, channel, framesize, version);
}
-void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
+void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
- out->send(new AMQFrame(channel, headerBody));
+ out->send(new AMQFrame(*version, channel, headerBody));
Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->send(out, channel, framesize);
+ if (content.get()) content->send(*version, out, channel, framesize);
}
BasicHeaderProperties* Message::getHeaderProperties(){
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h Tue Jan 9 11:44:50 2007
@@ -58,7 +58,7 @@
qpid::sys::Mutex contentLock;
void sendContent(qpid::framing::OutputHandler* out,
- int channel, u_int32_t framesize);
+ int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version);
public:
typedef boost::shared_ptr<Message> shared_ptr;
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h Tue Jan 9 11:44:50 2007
@@ -24,6 +24,7 @@
#include <AMQContentBody.h>
#include <Buffer.h>
#include <OutputHandler.h>
+#include <ProtocolVersion.h>
namespace qpid {
namespace broker {
@@ -31,7 +32,7 @@
public:
virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
virtual u_int32_t size() = 0;
- virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
+ virtual void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
virtual void encode(qpid::framing::Buffer& buffer) = 0;
virtual void destroy() = 0;
virtual ~Content(){}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp Tue Jan 9 11:44:50 2007
@@ -38,24 +38,24 @@
return sum;
}
-void InMemoryContent::send(OutputHandler* out, int channel, u_int32_t framesize)
+void InMemoryContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
if ((*i)->size() > framesize) {
u_int32_t offset = 0;
for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
string data = (*i)->getData().substr(offset, framesize);
- out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
offset += framesize;
}
u_int32_t remainder = (*i)->size() % framesize;
if (remainder) {
string data = (*i)->getData().substr(offset, remainder);
- out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
}
} else {
AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- out->send(new AMQFrame(channel, contentBody));
+ out->send(new AMQFrame(version, channel, contentBody));
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h Tue Jan 9 11:44:50 2007
@@ -24,6 +24,7 @@
#include <Content.h>
#include <vector>
+
namespace qpid {
namespace broker {
class InMemoryContent : public Content{
@@ -34,7 +35,7 @@
public:
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
- void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
void destroy();
~InMemoryContent(){}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp Tue Jan 9 11:44:50 2007
@@ -36,19 +36,19 @@
return 0;//all content is written as soon as it is added
}
-void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesize)
+void LazyLoadedContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
{
if (expectedSize > framesize) {
for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {
u_int64_t remaining = expectedSize - offset;
string data;
store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining);
- out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
}
} else {
string data;
store->loadContent(msg, data, 0, expectedSize);
- out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h Tue Jan 9 11:44:50 2007
@@ -34,7 +34,7 @@
LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize);
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
- void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
void destroy();
~LazyLoadedContent(){}
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp Tue Jan 9 11:44:50 2007
@@ -56,9 +56,9 @@
void Channel::setQos(){
// AMQP version management change - kpvdr 2006-11-20
// TODO: Make this class version-aware and link these hard-wired numbers to that version
- sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+ sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
if(transactional){
- sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+ sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok);
}
}
@@ -66,7 +66,7 @@
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
+ AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
if(synch){
sendAndReceive(frame, method_bodies.exchange_declare_ok);
}else{
@@ -76,7 +76,7 @@
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch));
+ AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch));
if(synch){
sendAndReceive(frame, method_bodies.exchange_delete_ok);
}else{
@@ -87,7 +87,7 @@
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false,
+ AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false,
queue.isExclusive(),
queue.isAutoDelete(), !synch, args));
if(synch){
@@ -105,7 +105,7 @@
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
//ticket, queue, ifunused, ifempty, nowait
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+ AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
if(synch){
sendAndReceive(frame, method_bodies.queue_delete_ok);
}else{
@@ -116,7 +116,7 @@
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args));
+ AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args));
if(synch){
sendAndReceive(frame, method_bodies.queue_bind_ok);
}else{
@@ -130,7 +130,7 @@
{
string q = queue.getName();
AMQFrame* frame =
- new AMQFrame(
+ new AMQFrame(version,
id,
new BasicConsumeBody(
version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
@@ -152,10 +152,10 @@
void Channel::cancel(std::string& tag, bool synch){
Consumer* c = consumers[tag];
if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+ out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
}
- AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch));
+ AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch));
if(synch){
sendAndReceive(frame, method_bodies.basic_cancel_ok);
}else{
@@ -171,7 +171,7 @@
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
Consumer* c = i->second;
if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ out->send(new AMQFrame(version, id, new BasicAckBody(c->lastDeliveryTag, true)));
}
consumers.erase(i);
delete c;
@@ -193,7 +193,7 @@
bool Channel::get(Message& msg, const Queue& queue, int ackMode){
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode));
+ AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode));
responses.expect();
out->send(frame);
responses.waitForResponse();
@@ -219,25 +219,25 @@
string e = exchange.getName();
string key = routingKey;
- out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(id, body));
+ out->send(new AMQFrame(version, id, body));
u_int64_t data_length = data.length();
if(data_length > 0){
u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
if(data_length < frag_size){
- out->send(new AMQFrame(id, new AMQContentBody(data)));
+ out->send(new AMQFrame(version, id, new AMQContentBody(data)));
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- out->send(new AMQFrame(id, new AMQContentBody(frag)));
+ out->send(new AMQFrame(version, id, new AMQContentBody(frag)));
offset += length;
remaining = data_length - offset;
@@ -247,12 +247,12 @@
}
void Channel::commit(){
- AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version));
+ AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version));
sendAndReceive(frame, method_bodies.tx_commit_ok);
}
void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version));
+ AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version));
sendAndReceive(frame, method_bodies.tx_rollback_ok);
}
@@ -377,7 +377,7 @@
if(++(consumer->count) < prefetch) break;
//else drop-through
case AUTO_ACK:
- out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
+ out->send(new AMQFrame(version, id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
consumer->lastDeliveryTag = 0;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp Tue Jan 9 11:44:50 2007
@@ -35,7 +35,7 @@
Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true),
version(_version->getMajor(),_version->getMinor())
{
- connector = new Connector(debug, _max_frame_size);
+ connector = new Connector(version, debug, _max_frame_size);
}
Connection::~Connection(){
@@ -61,7 +61,7 @@
string response = ((char)0) + uid + ((char)0) + pwd;
string locale("en_US");
responses.expect();
- out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
+ out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
/**
* Assume for now that further challenges will not be required
@@ -74,7 +74,7 @@
responses.receive(method_bodies.connection_tune);
ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+ out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
u_int16_t heartbeat = proposal->getHeartbeat();
connector->setReadTimeout(heartbeat * 2);
@@ -84,7 +84,7 @@
string capabilities;
string vhost = virtualhost;
responses.expect();
- out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true)));
+ out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
//receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
responses.waitForResponse();
if(responses.validate(method_bodies.connection_open_ok)){
@@ -106,7 +106,7 @@
u_int16_t classId(0);
u_int16_t methodId(0);
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+ sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
connector->close();
}
}
@@ -118,7 +118,7 @@
channels[channel->id] = channel;
//now send frame to open channel and wait for response
string oob;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
+ channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
channel->setQos();
channel->closed = false;
}
@@ -136,7 +136,7 @@
//send frame to close channel
channel->cancelAll();
channel->closed = true;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
+ channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
channel->con = 0;
channel->out = 0;
removeChannel(channel);
@@ -209,7 +209,7 @@
std::cout << " [" << methodid << ":" << classid << "]";
}
std::cout << std::endl;
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
+ sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
connector->close();
}
@@ -230,7 +230,7 @@
}
void Connection::idleOut(){
- out->send(new AMQFrame(0, new AMQHeartbeatBody()));
+ out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
}
void Connection::shutdown(){
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp Tue Jan 9 11:44:50 2007
@@ -28,10 +28,11 @@
using namespace qpid::framing;
using qpid::QpidError;
-Connector::Connector(bool _debug, u_int32_t buffer_size) :
- debug(_debug),
+Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) :
+ debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
+ version(pVersion),
closed(true),
lastIn(0), lastOut(0),
timeout(0),
@@ -162,7 +163,7 @@
inbuf.move(received);
inbuf.flip();//position = 0, limit = total data read
- AMQFrame frame;
+ AMQFrame frame(version);
while(frame.decode(inbuf)){
if(debug) std::cout << "RECV: " << frame << std::endl;
input->received(&frame);
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h Tue Jan 9 11:44:50 2007
@@ -26,6 +26,7 @@
#include <framing/OutputHandler.h>
#include <framing/InitiationHandler.h>
#include <framing/ProtocolInitiation.h>
+#include <ProtocolVersion.h>
#include <sys/ShutdownHandler.h>
#include <sys/TimeoutHandler.h>
#include <sys/Thread.h>
@@ -41,6 +42,7 @@
const bool debug;
const int receive_buffer_size;
const int send_buffer_size;
+ qpid::framing::ProtocolVersion version;
bool closed;
@@ -73,7 +75,7 @@
void handleClosed();
public:
- Connector(bool debug = false, u_int32_t buffer_size = 1024);
+ Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024);
virtual ~Connector();
virtual void connect(const std::string& host, int port);
virtual void init(qpid::framing::ProtocolInitiation* header);
Modified: incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.cpp Tue Jan 9 11:44:50 2007
@@ -24,24 +24,22 @@
using namespace qpid::framing;
-// This only works as a static as the version is currently fixed to 8.0
-// TODO: When the class is version-aware this will need to change
-AMQP_MethodVersionMap AMQFrame::versionMap(8,0);
-
-// AMQP version management change - kpvdr 2-11-17
-// TODO: Make this class version-aware
-AMQFrame::AMQFrame() {}
-
-// AMQP version management change - kpvdr 2006-11-17
-// TODO: Make this class version-aware
-AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) :
-channel(_channel), body(_body)
+
+AMQP_MethodVersionMap AMQFrame::versionMap;
+
+
+AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version):
+version(_version)
{}
-// AMQP version management change - kpvdr 2006-11-17
-// TODO: Make this class version-aware
-AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) :
-channel(_channel), body(_body)
+
+AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) :
+version(_version), channel(_channel), body(_body)
+{}
+
+
+AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody::shared_ptr& _body) :
+version(_version), channel(_channel), body(_body)
{}
AMQFrame::~AMQFrame() {}
@@ -66,11 +64,7 @@
AMQBody::shared_ptr AMQFrame::createMethodBody(Buffer& buffer){
u_int16_t classId = buffer.getShort();
u_int16_t methodId = buffer.getShort();
- // AMQP version management change - kpvdr 2006-11-16
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
- AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, 8, 0));
- // Origianl stmt:
- // AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId));
+ AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, version.getMajor(), version.getMinor()));
return body;
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.h?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/framing/AMQFrame.h Tue Jan 9 11:44:50 2007
@@ -27,6 +27,7 @@
#include <AMQContentBody.h>
#include <AMQHeartbeatBody.h>
#include <AMQP_MethodVersionMap.h>
+#include <AMQP_HighestVersion.h>
#include <Buffer.h>
#ifndef _AMQFrame_
@@ -39,16 +40,17 @@
class AMQFrame : virtual public AMQDataBlock
{
static AMQP_MethodVersionMap versionMap;
-
+ qpid::framing::ProtocolVersion version;
+
u_int16_t channel;
u_int8_t type;//used if the body is decoded separately from the 'head'
AMQBody::shared_ptr body;
AMQBody::shared_ptr createMethodBody(Buffer& buffer);
public:
- AMQFrame();
- AMQFrame(u_int16_t channel, AMQBody* body);
- AMQFrame(u_int16_t channel, AMQBody::shared_ptr& body);
+ AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
+ AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body);
+ AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody::shared_ptr& body);
virtual ~AMQFrame();
virtual void encode(Buffer& buffer);
virtual bool decode(Buffer& buffer);
Modified: incubator/qpid/trunk/qpid/cpp/tests/BodyHandlerTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/BodyHandlerTest.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/BodyHandlerTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/BodyHandlerTest.cpp Tue Jan 9 11:44:50 2007
@@ -19,6 +19,7 @@
*
*/
#include <iostream>
+#include <AMQP_HighestVersion.h>
#include <amqp_framing.h>
#include <qpid_test_plugin.h>
using namespace qpid::framing;
@@ -72,7 +73,7 @@
void testMethod()
{
AMQMethodBody* method = new QueueDeclareBody(v);
- AMQFrame frame(0, method);
+ AMQFrame frame(highestProtocolVersion, 0, method);
TestBodyHandler handler(method);
handler.handleBody(frame.getBody());
}
@@ -80,7 +81,7 @@
void testHeader()
{
AMQHeaderBody* header = new AMQHeaderBody();
- AMQFrame frame(0, header);
+ AMQFrame frame(highestProtocolVersion, 0, header);
TestBodyHandler handler(header);
handler.handleBody(frame.getBody());
}
@@ -88,7 +89,7 @@
void testContent()
{
AMQContentBody* content = new AMQContentBody();
- AMQFrame frame(0, content);
+ AMQFrame frame(highestProtocolVersion, 0, content);
TestBodyHandler handler(content);
handler.handleBody(frame.getBody());
}
@@ -96,7 +97,7 @@
void testHeartbeat()
{
AMQHeartbeatBody* heartbeat = new AMQHeartbeatBody();
- AMQFrame frame(0, heartbeat);
+ AMQFrame frame(highestProtocolVersion, 0, heartbeat);
TestBodyHandler handler(heartbeat);
handler.handleBody(frame.getBody());
}
Modified: incubator/qpid/trunk/qpid/cpp/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/FramingTest.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/FramingTest.cpp Tue Jan 9 11:44:50 2007
@@ -120,7 +120,7 @@
{
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(999, new ConnectionRedirectBody(v, a, b));
+ AMQFrame in(highestProtocolVersion, 999, new ConnectionRedirectBody(v, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -131,7 +131,7 @@
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(999, new BasicConsumeOkBody(v, s));
+ AMQFrame in(highestProtocolVersion, 999, new BasicConsumeOkBody(v, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
Modified: incubator/qpid/trunk/qpid/cpp/tests/InMemoryContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/InMemoryContentTest.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/InMemoryContentTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/InMemoryContentTest.cpp Tue Jan 9 11:44:50 2007
@@ -20,6 +20,7 @@
*/
#include <InMemoryContent.h>
#include <qpid_test_plugin.h>
+#include <AMQP_HighestVersion.h>
#include <iostream>
#include <list>
@@ -66,7 +67,7 @@
u_int16_t channel = 3;
addframes(content, inCount, in);
- content.send(&handler, channel, framesize);
+ content.send(highestProtocolVersion, &handler, channel, framesize);
check(handler, channel, outCount, out);
}
Modified: incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp?view=diff&rev=494540&r1=494539&r2=494540
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp Tue Jan 9 11:44:50 2007
@@ -19,6 +19,7 @@
*
*/
#include <LazyLoadedContent.h>
+#include <AMQP_HighestVersion.h>
#include <NullMessageStore.h>
#include <qpid_test_plugin.h>
#include <iostream>
@@ -99,7 +100,7 @@
LazyLoadedContent content(&store, 0, in.size());
DummyHandler handler;
u_int16_t channel = 3;
- content.send(&handler, channel, framesize);
+ content.send(highestProtocolVersion, &handler, channel, framesize);
check(handler, channel, outCount, out);
}