You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2006/12/14 22:38:28 UTC

svn commit: r487359 - in /incubator/qpid/trunk/qpid/cpp/lib/broker: SessionHandlerImpl.cpp SessionHandlerImpl.h

Author: cctrieloff
Date: Thu Dec 14 13:38:28 2006
New Revision: 487359

URL: http://svn.apache.org/viewvc?view=rev&rev=487359
Log:

Broker side dynamic version hook up.


Modified:
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=487359&r1=487358&r2=487359
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp Thu Dec 14 13:38:28 2006
@@ -37,9 +37,6 @@
                                        AutoDelete* _cleaner,
                                        const Settings& _settings) :
     context(_context), 
-// AMQP version management change - kpvdr 2006-11-17
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
-    client(context, 8, 0),
     queues(_queues), 
     exchanges(_exchanges),
     cleaner(_cleaner),
@@ -51,9 +48,18 @@
     queueHandler(new QueueHandlerImpl(this)),
     txHandler(new TxHandlerImpl(this)),
     framemax(65536), 
-    heartbeat(0) {}
+    heartbeat(0) 
+    {
+    
+    client =NULL;    
+}
 
-SessionHandlerImpl::~SessionHandlerImpl(){}
+SessionHandlerImpl::~SessionHandlerImpl(){
+
+   if (client != NULL)
+    	delete client;
+
+}
 
 Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
     channel_iterator i = channels.find(channel);
@@ -96,12 +102,12 @@
         }catch(ChannelException& e){
             channels[channel]->close();
             channels.erase(channel);
-            client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+            client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
         }catch(ConnectionException& e){
-            client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+            client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
         }catch(std::exception& e){
             string error(e.what());
-            client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
+            client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
         }
 	break;
 
@@ -120,14 +126,21 @@
     }
 }
 
-void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){
-    //send connection start
-    FieldTable properties;
-    string mechanisms("PLAIN");
-    string locales("en_US");
-    client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
+
+  if (client == NULL)
+    {
+    	client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
+  
+    	//send connection start
+    	FieldTable properties;
+    	string mechanisms("PLAIN");
+    	string locales("en_US");    // channel, majour, minor
+      	client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales);
+    }
 }
 
+
 void SessionHandlerImpl::idleOut(){
 
 }
@@ -169,8 +182,7 @@
 void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
     u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, 
     const string& /*response*/, const string& /*locale*/){
-
-    parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
+    parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
 }
         
 void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
@@ -182,14 +194,14 @@
         
 void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
     string knownhosts;
-    parent->client.getConnection().openOk(0, knownhosts);
+    parent->client->getConnection().openOk(0, knownhosts);
 }
         
 void SessionHandlerImpl::ConnectionHandlerImpl::close(
     u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, 
     u_int16_t /*classId*/, u_int16_t /*methodId*/)
 {
-    parent->client.getConnection().closeOk(0);
+    parent->client->getConnection().closeOk(0);
     parent->context->close();
 } 
         
@@ -202,7 +214,7 @@
 void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
     parent->channels[channel] = new Channel(parent->context, channel, parent->framemax, 
                                             parent->queues->getStore(), parent->settings.stagingThreshold);
-    parent->client.getChannel().openOk(channel);
+    parent->client->getChannel().openOk(channel);
 } 
         
 void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}         
@@ -215,7 +227,7 @@
         parent->channels.erase(channel);
         c->close();
         delete c;
-        parent->client.getChannel().closeOk(channel);
+        parent->client->getChannel().closeOk(channel);
     }
 } 
         
@@ -242,17 +254,17 @@
             throw ConnectionException(503, "Exchange type not implemented: " + type);
         }
     }
-    
     if(!nowait){
-        parent->client.getExchange().declareOk(channel);
+        parent->client->getExchange().declareOk(channel);
     }
 } 
                 
 void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, 
                                                       const string& exchange, bool /*ifUnused*/, bool nowait){
+
     //TODO: implement unused
     parent->exchanges->destroy(exchange);
-    if(!nowait) parent->client.getExchange().deleteOk(channel);
+    if(!nowait) parent->client->getExchange().deleteOk(channel);
 } 
 
 void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, 
@@ -286,7 +298,7 @@
     }
     if (!nowait) {
         string queueName = queue->getName();
-        parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+        parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
     }
 } 
         
@@ -302,7 +314,7 @@
 //        exchange->bind(queue, routingKey, &arguments);
         string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
         exchange->bind(queue, exchangeRoutingKey, &arguments);
-        if(!nowait) parent->client.getQueue().bindOk(channel);    
+        if(!nowait) parent->client->getQueue().bindOk(channel);    
     }else{
         throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
     }
@@ -312,7 +324,7 @@
 
     Queue::shared_ptr queue = parent->getQueue(queueName, channel);
     int count = queue->purge();
-    if(!nowait) parent->client.getQueue().purgeOk(channel, count);
+    if(!nowait) parent->client->getQueue().purgeOk(channel, count);
 } 
         
 void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, 
@@ -334,7 +346,8 @@
         q->destroy();
         parent->queues->destroy(queue);
     }
-    if(!nowait) parent->client.getQueue().deleteOk(channel, count);
+
+    if(!nowait) parent->client->getQueue().deleteOk(channel, count);
 } 
               
         
@@ -344,7 +357,7 @@
     //TODO: handle global
     parent->getChannel(channel)->setPrefetchSize(prefetchSize);
     parent->getChannel(channel)->setPrefetchCount(prefetchCount);
-    parent->client.getBasic().qosOk(channel);
+    parent->client->getBasic().qosOk(channel);
 } 
         
 void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/, 
@@ -361,7 +374,8 @@
     try{
         string newTag = consumerTag;
         channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0);
-        if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag);
+
+        if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag);
 
         //allow messages to be dispatched if required as there is now a consumer:
         queue->dispatch();
@@ -374,7 +388,8 @@
         
 void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
     parent->getChannel(channel)->cancel(consumerTag);
-    if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
+
+    if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag);
 } 
         
 void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, 
@@ -394,7 +409,8 @@
     Queue::shared_ptr queue = parent->getQueue(queueName, channelId);    
     if(!parent->getChannel(channelId)->get(queue, !noAck)){
         string clusterId;//not used, part of an imatix hack
-        parent->client.getBasic().getEmpty(channelId, clusterId);
+
+        parent->client->getBasic().getEmpty(channelId, clusterId);
     }
 } 
         
@@ -414,17 +430,18 @@
 
 void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
     parent->getChannel(channel)->begin();
-    parent->client.getTx().selectOk(channel);
+    parent->client->getTx().selectOk(channel);
 }
 
 void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
     parent->getChannel(channel)->commit();
-    parent->client.getTx().commitOk(channel);
+    parent->client->getTx().commitOk(channel);
 }
 
 void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+    
     parent->getChannel(channel)->rollback();
-    parent->client.getTx().rollbackOk(channel);
+    parent->client->getTx().rollbackOk(channel);
     parent->getChannel(channel)->recover(false);    
 }
               

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h?view=diff&rev=487359&r1=487358&r2=487359
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h Thu Dec 14 13:38:28 2006
@@ -76,7 +76,7 @@
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
     qpid::sys::SessionContext* context;
-    qpid::framing::AMQP_ClientProxy client;
+    qpid::framing::AMQP_ClientProxy* client;
     QueueRegistry* queues;
     ExchangeRegistry* const exchanges;
     AutoDelete* const cleaner;