You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2006/12/14 22:38:28 UTC
svn commit: r487359 - in /incubator/qpid/trunk/qpid/cpp/lib/broker:
SessionHandlerImpl.cpp SessionHandlerImpl.h
Author: cctrieloff
Date: Thu Dec 14 13:38:28 2006
New Revision: 487359
URL: http://svn.apache.org/viewvc?view=rev&rev=487359
Log:
Broker side dynamic version hook up.
Modified:
incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=487359&r1=487358&r2=487359
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp Thu Dec 14 13:38:28 2006
@@ -37,9 +37,6 @@
AutoDelete* _cleaner,
const Settings& _settings) :
context(_context),
-// AMQP version management change - kpvdr 2006-11-17
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- client(context, 8, 0),
queues(_queues),
exchanges(_exchanges),
cleaner(_cleaner),
@@ -51,9 +48,18 @@
queueHandler(new QueueHandlerImpl(this)),
txHandler(new TxHandlerImpl(this)),
framemax(65536),
- heartbeat(0) {}
+ heartbeat(0)
+ {
+
+ client =NULL;
+}
-SessionHandlerImpl::~SessionHandlerImpl(){}
+SessionHandlerImpl::~SessionHandlerImpl(){
+
+ if (client != NULL)
+ delete client;
+
+}
Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
channel_iterator i = channels.find(channel);
@@ -96,12 +102,12 @@
}catch(ChannelException& e){
channels[channel]->close();
channels.erase(channel);
- client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
- client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
string error(e.what());
- client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
+ client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
}
break;
@@ -120,14 +126,21 @@
}
}
-void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){
- //send connection start
- FieldTable properties;
- string mechanisms("PLAIN");
- string locales("en_US");
- client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
+
+ if (client == NULL)
+ {
+ client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
+
+ //send connection start
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US"); // channel, majour, minor
+ client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales);
+ }
}
+
void SessionHandlerImpl::idleOut(){
}
@@ -169,8 +182,7 @@
void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
-
- parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
+ parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
}
void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
@@ -182,14 +194,14 @@
void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
- parent->client.getConnection().openOk(0, knownhosts);
+ parent->client->getConnection().openOk(0, knownhosts);
}
void SessionHandlerImpl::ConnectionHandlerImpl::close(
u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- parent->client.getConnection().closeOk(0);
+ parent->client->getConnection().closeOk(0);
parent->context->close();
}
@@ -202,7 +214,7 @@
void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
parent->channels[channel] = new Channel(parent->context, channel, parent->framemax,
parent->queues->getStore(), parent->settings.stagingThreshold);
- parent->client.getChannel().openOk(channel);
+ parent->client->getChannel().openOk(channel);
}
void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
@@ -215,7 +227,7 @@
parent->channels.erase(channel);
c->close();
delete c;
- parent->client.getChannel().closeOk(channel);
+ parent->client->getChannel().closeOk(channel);
}
}
@@ -242,17 +254,17 @@
throw ConnectionException(503, "Exchange type not implemented: " + type);
}
}
-
if(!nowait){
- parent->client.getExchange().declareOk(channel);
+ parent->client->getExchange().declareOk(channel);
}
}
void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
+
//TODO: implement unused
parent->exchanges->destroy(exchange);
- if(!nowait) parent->client.getExchange().deleteOk(channel);
+ if(!nowait) parent->client->getExchange().deleteOk(channel);
}
void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
@@ -286,7 +298,7 @@
}
if (!nowait) {
string queueName = queue->getName();
- parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
@@ -302,7 +314,7 @@
// exchange->bind(queue, routingKey, &arguments);
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) parent->client.getQueue().bindOk(channel);
+ if(!nowait) parent->client->getQueue().bindOk(channel);
}else{
throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
}
@@ -312,7 +324,7 @@
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
int count = queue->purge();
- if(!nowait) parent->client.getQueue().purgeOk(channel, count);
+ if(!nowait) parent->client->getQueue().purgeOk(channel, count);
}
void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
@@ -334,7 +346,8 @@
q->destroy();
parent->queues->destroy(queue);
}
- if(!nowait) parent->client.getQueue().deleteOk(channel, count);
+
+ if(!nowait) parent->client->getQueue().deleteOk(channel, count);
}
@@ -344,7 +357,7 @@
//TODO: handle global
parent->getChannel(channel)->setPrefetchSize(prefetchSize);
parent->getChannel(channel)->setPrefetchCount(prefetchCount);
- parent->client.getBasic().qosOk(channel);
+ parent->client->getBasic().qosOk(channel);
}
void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/,
@@ -361,7 +374,8 @@
try{
string newTag = consumerTag;
channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0);
- if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag);
+
+ if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -374,7 +388,8 @@
void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
parent->getChannel(channel)->cancel(consumerTag);
- if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
+
+ if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag);
}
void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
@@ -394,7 +409,8 @@
Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
if(!parent->getChannel(channelId)->get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- parent->client.getBasic().getEmpty(channelId, clusterId);
+
+ parent->client->getBasic().getEmpty(channelId, clusterId);
}
}
@@ -414,17 +430,18 @@
void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
parent->getChannel(channel)->begin();
- parent->client.getTx().selectOk(channel);
+ parent->client->getTx().selectOk(channel);
}
void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
parent->getChannel(channel)->commit();
- parent->client.getTx().commitOk(channel);
+ parent->client->getTx().commitOk(channel);
}
void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+
parent->getChannel(channel)->rollback();
- parent->client.getTx().rollbackOk(channel);
+ parent->client->getTx().rollbackOk(channel);
parent->getChannel(channel)->recover(false);
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h?view=diff&rev=487359&r1=487358&r2=487359
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h Thu Dec 14 13:38:28 2006
@@ -76,7 +76,7 @@
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
qpid::sys::SessionContext* context;
- qpid::framing::AMQP_ClientProxy client;
+ qpid::framing::AMQP_ClientProxy* client;
QueueRegistry* queues;
ExchangeRegistry* const exchanges;
AutoDelete* const cleaner;