You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/07 00:48:26 UTC
svn commit: r1310629 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core:
ActiveMQProducer.cpp kernels/ActiveMQConsumerKernel.cpp
kernels/ActiveMQProducerKernel.cpp kernels/ActiveMQSessionKernel.cpp
Author: tabish
Date: Fri Apr 6 22:48:26 2012
New Revision: 1310629
URL: http://svn.apache.org/viewvc?rev=1310629&view=rev
Log:
Fix some issues with the shutdown of kernels and pointer deletes.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=1310629&r1=1310628&r2=1310629&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Fri Apr 6 22:48:26 2012
@@ -33,8 +33,7 @@ using namespace decaf::lang::exceptions;
ActiveMQProducer::ActiveMQProducer(Pointer<ActiveMQProducerKernel> kernel) : kernel(kernel) {
if (kernel == NULL) {
- throw ActiveMQException(
- __FILE__, __LINE__,
+ throw ActiveMQException(__FILE__, __LINE__,
"ActiveMQProducer::ActiveMQProducer - Initialized with NULL Kernel");
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1310629&r1=1310628&r2=1310629&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Fri Apr 6 22:48:26 2012
@@ -206,7 +206,7 @@ namespace kernels {
public:
ClientAckHandler( ActiveMQSessionKernel* session ) : session(session) {
- if( session == NULL ) {
+ if (session == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Ack Handler Created with NULL Session.");
}
@@ -246,13 +246,13 @@ namespace kernels {
}
}
- void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) {
+ void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED ) {
try {
- if( this->dispatch != NULL ) {
- this->consumer->acknowledge( this->dispatch );
- this->dispatch.reset( NULL );
+ if (this->dispatch != NULL) {
+ this->consumer->acknowledge(this->dispatch);
+ this->dispatch.reset(NULL);
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -277,7 +277,7 @@ namespace kernels {
StartConsumerTask( ActiveMQConsumerKernel* consumer ) : Runnable(), consumer(NULL) {
- if( consumer == NULL ) {
+ if (consumer == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
@@ -288,7 +288,7 @@ namespace kernels {
virtual ~StartConsumerTask() {}
virtual void run() {
- try{
+ try {
if(!this->consumer->isClosed()) {
this->consumer->start();
}
@@ -371,7 +371,7 @@ ActiveMQConsumerKernel::~ActiveMQConsume
try {
- try{
+ try {
this->close();
} catch(...) {}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp?rev=1310629&r1=1310628&r2=1310629&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp Fri Apr 6 22:48:26 2012
@@ -106,6 +106,8 @@ void ActiveMQProducerKernel::close() {
Pointer<RemoveInfo> info(new RemoveInfo);
info->setObjectId(this->producerInfo->getProducerId());
this->session->oneway(info);
+
+ this->closed = true;
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -122,6 +124,7 @@ void ActiveMQProducerKernel::dispose() {
producer.release();
throw;
}
+ producer.release();
this->closed = true;
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1310629&r1=1310628&r2=1310629&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Fri Apr 6 22:48:26 2012
@@ -124,7 +124,7 @@ namespace {
CloseSynhcronization(ActiveMQSessionKernel* session) : Synchronization(), session(session) {
- if(session == NULL) {
+ if (session == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Synchronization Created with NULL Session.");
}
@@ -225,7 +225,7 @@ ActiveMQSessionKernel::ActiveMQSessionKe
////////////////////////////////////////////////////////////////////////////////
ActiveMQSessionKernel::~ActiveMQSessionKernel() {
- try{
+ try {
// Destroy this session's resources
close();
}
@@ -246,11 +246,11 @@ void ActiveMQSessionKernel::fire(const a
void ActiveMQSessionKernel::close() {
// If we're already closed, just return.
- if( this->closed.get() ) {
+ if (this->closed.get()) {
return;
}
- if( this->transaction->isInXATransaction() ) {
+ if (this->transaction->isInXATransaction()) {
// TODO - Right now we don't have a safe way of dealing with this case
// since the session might be deleted before the XA Transaction is finalized
@@ -298,13 +298,13 @@ void ActiveMQSessionKernel::dispose() {
private:
- Finalizer( const Finalizer& );
- Finalizer& operator= ( const Finalizer& );
+ Finalizer(const Finalizer&);
+ Finalizer& operator=(const Finalizer&);
public:
Finalizer(ActiveMQSessionKernel* session, ActiveMQConnection* connection) :
- session( session ), connection( connection ) {
+ session(session), connection(connection) {
}
~Finalizer() {
@@ -314,6 +314,7 @@ void ActiveMQSessionKernel::dispose() {
} catch(...) {
session.release();
}
+ session.release();
this->session->closed = true;
}
};
@@ -351,10 +352,10 @@ void ActiveMQSessionKernel::dispose() {
// Dispose of all Producers, the dispose method skips the RemoveInfo command.
std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
- while( producerIter->hasNext() ) {
+ while (producerIter->hasNext()) {
try{
producerIter->next()->dispose();
- } catch( cms::CMSException& ex ){
+ } catch (cms::CMSException& ex) {
/* Absorb */
}
}
@@ -371,10 +372,8 @@ void ActiveMQSessionKernel::commit() {
this->checkClosed();
- if( !this->isTransacted() ) {
- throw ActiveMQException(
- __FILE__, __LINE__,
- "ActiveMQSessionKernel::commit - This Session is not Transacted");
+ if (!this->isTransacted()) {
+ throw ActiveMQException(__FILE__, __LINE__, "ActiveMQSessionKernel::commit - This Session is not Transacted");
}
// Commit the Transaction
@@ -386,14 +385,12 @@ void ActiveMQSessionKernel::commit() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::rollback() {
- try{
+ try {
this->checkClosed();
- if( !this->isTransacted() ) {
- throw ActiveMQException(
- __FILE__, __LINE__,
- "ActiveMQSessionKernel::rollback - This Session is not Transacted" );
+ if (!this->isTransacted()) {
+ throw ActiveMQException(__FILE__, __LINE__, "ActiveMQSessionKernel::rollback - This Session is not Transacted");
}
// Roll back the Transaction
@@ -405,7 +402,7 @@ void ActiveMQSessionKernel::rollback() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::recover() {
- try{
+ try {
checkClosed();
@@ -428,15 +425,15 @@ void ActiveMQSessionKernel::recover() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::clearMessagesInProgress() {
- if( this->executor.get() != NULL ) {
+ if (this->executor.get() != NULL) {
this->executor->clearMessagesInProgress();
}
- synchronized( &this->consumers ) {
+ synchronized(&this->consumers) {
std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
- for( ; iter != consumers.end(); ++iter ) {
+ for (; iter != consumers.end(); ++iter) {
(*iter)->inProgressClearRequired();
this->connection->getScheduler()->executeAfterDelay(
@@ -448,11 +445,11 @@ void ActiveMQSessionKernel::clearMessage
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::acknowledge() {
- synchronized( &this->consumers ) {
+ synchronized(&this->consumers) {
std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
- for( ; iter != consumers.end(); ++iter ) {
+ for (; iter != consumers.end(); ++iter) {
(*iter)->acknowledge();
}
}
@@ -461,20 +458,20 @@ void ActiveMQSessionKernel::acknowledge(
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::deliverAcks() {
- synchronized( &this->consumers ) {
+ synchronized(&this->consumers) {
std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
- for( ; iter != consumers.end(); ++iter ) {
+ for (; iter != consumers.end(); ++iter) {
(*iter)->deliverAcks();
}
}
}
////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSessionKernel::createConsumer( const cms::Destination* destination ) {
+cms::MessageConsumer* ActiveMQSessionKernel::createConsumer(const cms::Destination* destination) {
- try{
+ try {
this->checkClosed();
return this->createConsumer(destination, "", false);
}
@@ -482,10 +479,9 @@ cms::MessageConsumer* ActiveMQSessionKer
}
////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSessionKernel::createConsumer( const cms::Destination* destination,
- const std::string& selector ) {
+cms::MessageConsumer* ActiveMQSessionKernel::createConsumer(const cms::Destination* destination, const std::string& selector) {
- try{
+ try {
this->checkClosed();
return this->createConsumer(destination, selector, false);
}
@@ -493,11 +489,10 @@ cms::MessageConsumer* ActiveMQSessionKer
}
////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSessionKernel::createConsumer( const cms::Destination* destination,
- const std::string& selector,
- bool noLocal ) {
+cms::MessageConsumer* ActiveMQSessionKernel::createConsumer(const cms::Destination* destination,
+ const std::string& selector, bool noLocal) {
- try{
+ try {
this->checkClosed();
@@ -506,16 +501,14 @@ cms::MessageConsumer* ActiveMQSessionKer
const ActiveMQDestination* amqDestination =
dynamic_cast<const ActiveMQDestination*>( destination );
- if( amqDestination == NULL ) {
- throw ActiveMQException(
- __FILE__, __LINE__,
- "Destination was either NULL or not created by this CMS Client" );
+ if (amqDestination == NULL) {
+ throw ActiveMQException(__FILE__, __LINE__, "Destination was either NULL or not created by this CMS Client");
}
Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
int prefetch = 0;
- if( dest->isTopic() ) {
+ if (dest->isTopic()) {
prefetch = this->connection->getPrefetchPolicy()->getTopicPrefetch();
} else {
prefetch = this->connection->getPrefetchPolicy()->getQueuePrefetch();
@@ -545,27 +538,22 @@ cms::MessageConsumer* ActiveMQSessionKer
}
////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSessionKernel::createDurableConsumer( const cms::Topic* destination,
- const std::string& name,
- const std::string& selector,
- bool noLocal ) {
+cms::MessageConsumer* ActiveMQSessionKernel::createDurableConsumer(const cms::Topic* destination, const std::string& name,
+ const std::string& selector, bool noLocal) {
- try{
+ try {
this->checkClosed();
// Cast the destination to an OpenWire destination, so we can
// get all the goodies.
- const ActiveMQDestination* amqDestination =
- dynamic_cast<const ActiveMQDestination*>( destination );
+ const ActiveMQDestination* amqDestination = dynamic_cast<const ActiveMQDestination*> (destination);
- if( amqDestination == NULL ) {
- throw ActiveMQException(
- __FILE__, __LINE__,
- "Destination was either NULL or not created by this CMS Client" );
+ if (amqDestination == NULL) {
+ throw ActiveMQException(__FILE__, __LINE__, "Destination was either NULL or not created by this CMS Client");
}
- Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
+ Pointer<ActiveMQDestination> dest(amqDestination->cloneDataStructure());
// Create the consumer instance.
Pointer<ActiveMQConsumerKernel> consumer(
@@ -594,7 +582,7 @@ cms::MessageConsumer* ActiveMQSessionKer
////////////////////////////////////////////////////////////////////////////////
cms::MessageProducer* ActiveMQSessionKernel::createProducer( const cms::Destination* destination ) {
- try{
+ try {
this->checkClosed();
@@ -622,7 +610,7 @@ cms::MessageProducer* ActiveMQSessionKer
Pointer<ActiveMQProducerKernel> producer( new ActiveMQProducerKernel(
this, this->getNextProducerId(), dest, this->connection->getSendTimeout() ) );
- try{
+ try {
this->addProducer(producer);
this->connection->oneway(producer->getProducerInfo());
} catch (Exception& ex) {
@@ -638,7 +626,7 @@ cms::MessageProducer* ActiveMQSessionKer
////////////////////////////////////////////////////////////////////////////////
cms::QueueBrowser* ActiveMQSessionKernel::createBrowser( const cms::Queue* queue ) {
- try{
+ try {
return ActiveMQSessionKernel::createBrowser(queue, "");
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -648,19 +636,17 @@ cms::QueueBrowser* ActiveMQSessionKernel
cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue,
const std::string& selector) {
- try{
+ try {
this->checkClosed();
// Cast the destination to an OpenWire destination, so we can
// get all the goodies.
const ActiveMQDestination* amqDestination =
- dynamic_cast<const ActiveMQDestination*> (queue);
+ dynamic_cast<const ActiveMQDestination*>(queue);
if (amqDestination == NULL) {
- throw ActiveMQException(
- __FILE__, __LINE__,
- "Destination was either NULL or not created by this CMS Client" );
+ throw ActiveMQException(__FILE__, __LINE__, "Destination was either NULL or not created by this CMS Client");
}
Pointer<ActiveMQDestination> dest(amqDestination->cloneDataStructure());
@@ -676,9 +662,9 @@ cms::QueueBrowser* ActiveMQSessionKernel
}
////////////////////////////////////////////////////////////////////////////////
-cms::Queue* ActiveMQSessionKernel::createQueue( const std::string& queueName ) {
+cms::Queue* ActiveMQSessionKernel::createQueue(const std::string& queueName) {
- try{
+ try {
this->checkClosed();
@@ -693,9 +679,9 @@ cms::Queue* ActiveMQSessionKernel::creat
}
////////////////////////////////////////////////////////////////////////////////
-cms::Topic* ActiveMQSessionKernel::createTopic( const std::string& topicName ) {
+cms::Topic* ActiveMQSessionKernel::createTopic(const std::string& topicName) {
- try{
+ try {
this->checkClosed();
@@ -712,7 +698,7 @@ cms::Topic* ActiveMQSessionKernel::creat
////////////////////////////////////////////////////////////////////////////////
cms::TemporaryQueue* ActiveMQSessionKernel::createTemporaryQueue() {
- try{
+ try {
this->checkClosed();
@@ -730,7 +716,7 @@ cms::TemporaryQueue* ActiveMQSessionKern
////////////////////////////////////////////////////////////////////////////////
cms::TemporaryTopic* ActiveMQSessionKernel::createTemporaryTopic() {
- try{
+ try {
this->checkClosed();
@@ -748,8 +734,7 @@ cms::TemporaryTopic* ActiveMQSessionKern
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQSessionKernel::createMessage() {
- try{
-
+ try {
this->checkClosed();
commands::ActiveMQMessage* message = new commands::ActiveMQMessage();
message->setConnection(this->connection);
@@ -761,8 +746,7 @@ cms::Message* ActiveMQSessionKernel::cre
////////////////////////////////////////////////////////////////////////////////
cms::BytesMessage* ActiveMQSessionKernel::createBytesMessage() {
- try{
-
+ try {
this->checkClosed();
commands::ActiveMQBytesMessage* message = new commands::ActiveMQBytesMessage();
message->setConnection(this->connection);
@@ -772,10 +756,9 @@ cms::BytesMessage* ActiveMQSessionKernel
}
////////////////////////////////////////////////////////////////////////////////
-cms::BytesMessage* ActiveMQSessionKernel::createBytesMessage( const unsigned char* bytes, int bytesSize ) {
-
- try{
+cms::BytesMessage* ActiveMQSessionKernel::createBytesMessage(const unsigned char* bytes, int bytesSize) {
+ try {
this->checkClosed();
cms::BytesMessage* msg = createBytesMessage();
msg->setBodyBytes(bytes, bytesSize);
@@ -787,8 +770,7 @@ cms::BytesMessage* ActiveMQSessionKernel
////////////////////////////////////////////////////////////////////////////////
cms::StreamMessage* ActiveMQSessionKernel::createStreamMessage() {
- try{
-
+ try {
this->checkClosed();
commands::ActiveMQStreamMessage* message = new commands::ActiveMQStreamMessage();
message->setConnection(this->connection);
@@ -800,8 +782,7 @@ cms::StreamMessage* ActiveMQSessionKerne
////////////////////////////////////////////////////////////////////////////////
cms::TextMessage* ActiveMQSessionKernel::createTextMessage() {
- try{
-
+ try {
this->checkClosed();
commands::ActiveMQTextMessage* message = new commands::ActiveMQTextMessage();
message->setConnection(this->connection);
@@ -811,10 +792,9 @@ cms::TextMessage* ActiveMQSessionKernel:
}
////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* ActiveMQSessionKernel::createTextMessage( const std::string& text ) {
+cms::TextMessage* ActiveMQSessionKernel::createTextMessage(const std::string& text) {
try {
-
this->checkClosed();
cms::TextMessage* msg = createTextMessage();
msg->setText(text.c_str());
@@ -826,8 +806,7 @@ cms::TextMessage* ActiveMQSessionKernel:
////////////////////////////////////////////////////////////////////////////////
cms::MapMessage* ActiveMQSessionKernel::createMapMessage() {
- try{
-
+ try {
this->checkClosed();
commands::ActiveMQMapMessage* message = new commands::ActiveMQMapMessage();
message->setConnection(this->connection);
@@ -915,7 +894,7 @@ void ActiveMQSessionKernel::send(cms::Me
////////////////////////////////////////////////////////////////////////////////
cms::ExceptionListener* ActiveMQSessionKernel::getExceptionListener() {
- if( connection != NULL ) {
+ if (connection != NULL) {
return connection->getExceptionListener();
}
@@ -930,7 +909,7 @@ Pointer<Scheduler> ActiveMQSessionKernel
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::unsubscribe(const std::string& name) {
- try{
+ try {
this->checkClosed();
@@ -950,7 +929,7 @@ void ActiveMQSessionKernel::unsubscribe(
void ActiveMQSessionKernel::dispatch(const Pointer<MessageDispatch>& dispatch) {
if (this->executor.get() != NULL) {
- this->executor->execute( dispatch );
+ this->executor->execute(dispatch);
}
}
@@ -961,7 +940,7 @@ void ActiveMQSessionKernel::redispatch(M
std::vector< Pointer<MessageDispatch> >::reverse_iterator iter = messages.rbegin();
for (; iter != messages.rend(); ++iter) {
- executor->executeFirst( *iter );
+ executor->executeFirst(*iter);
}
}
@@ -1056,7 +1035,7 @@ std::string ActiveMQSessionKernel::creat
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::oneway(Pointer<Command> command) {
- try{
+ try {
this->checkClosed();
this->connection->oneway(command);
}
@@ -1068,7 +1047,7 @@ void ActiveMQSessionKernel::oneway(Point
////////////////////////////////////////////////////////////////////////////////
Pointer<Response> ActiveMQSessionKernel::syncRequest(Pointer<Command> command, unsigned int timeout) {
- try{
+ try {
this->checkClosed();
return this->connection->syncRequest(command, timeout);
}
@@ -1079,17 +1058,15 @@ Pointer<Response> ActiveMQSessionKernel:
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::checkClosed() const {
- if( this->closed.get() ) {
- throw ActiveMQException(
- __FILE__, __LINE__,
- "ActiveMQSessionKernel - Session Already Closed" );
+ if (this->closed.get()) {
+ throw ActiveMQException(__FILE__, __LINE__, "ActiveMQSessionKernel - Session Already Closed");
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::addConsumer(Pointer<ActiveMQConsumerKernel> consumer) {
- try{
+ try {
this->checkClosed();
@@ -1109,7 +1086,7 @@ void ActiveMQSessionKernel::addConsumer(
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::removeConsumer(const Pointer<ConsumerId>& consumerId) {
- try{
+ try {
this->checkClosed();
@@ -1130,13 +1107,9 @@ void ActiveMQSessionKernel::removeConsum
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::addProducer(Pointer<ActiveMQProducerKernel> producer) {
- try{
-
+ try {
this->checkClosed();
-
this->config->producers.add(producer);
-
- // Add to the Connections list
this->connection->addProducer(producer);
}
AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
@@ -1147,10 +1120,8 @@ void ActiveMQSessionKernel::addProducer(
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::removeProducer(Pointer<ActiveMQProducerKernel> producer) {
- try{
-
+ try {
this->checkClosed();
-
this->connection->removeProducer(producer->getProducerId());
this->config->producers.remove(producer);
}