You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/24 22:09:00 UTC

svn commit: r1401852 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp

Author: tabish
Date: Wed Oct 24 20:08:59 2012
New Revision: 1401852

URL: http://svn.apache.org/viewvc?rev=1401852&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-405

Account for advisory consumer in interruption processing.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1401852&r1=1401851&r2=1401852&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Oct 24 20:08:59 2012
@@ -408,12 +408,8 @@ namespace core{
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport> transport,
                                        const Pointer<decaf::util::Properties> properties) :
-    config(NULL),
-    connectionMetaData(new ActiveMQConnectionMetaData()),
-    started(false),
-    closed(false),
-    closing(false),
-    transportFailed(false) {
+    config(NULL), connectionMetaData(new ActiveMQConnectionMetaData()), started(false),
+    closed(false), closing(false), transportFailed(false) {
 
     Pointer<ConnectionConfig> configuration(
             new ConnectionConfig(transport, properties));
@@ -432,28 +428,26 @@ ActiveMQConnection::ActiveMQConnection(c
 ActiveMQConnection::~ActiveMQConnection() {
     try {
 
-        try{
+        try {
             this->close();
-        } catch(...) {}
+        }
+        AMQ_CATCHALL_NOTHROW()
 
         // This must happen even if exceptions occur in the Close attempt.
         delete this->config;
     }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
+    AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::addDispatcher(
-    const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher) {
+void ActiveMQConnection::addDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher) {
 
-    try{
+    try {
         synchronized(&this->config->dispatchers) {
             this->config->dispatchers.put(consumer, dispatcher);
         }
     }
-    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-}
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()}
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::removeDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer) {
@@ -463,8 +457,7 @@ void ActiveMQConnection::removeDispatche
             this->config->dispatchers.remove(consumer);
         }
     }
-    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-}
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()}
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::Session* ActiveMQConnection::createSession() {
@@ -485,8 +478,7 @@ cms::Session* ActiveMQConnection::create
         // Create the session instance as a Session Kernel we then create and return a
         // ActiveMQSession instance that acts as a proxy to the kernel caller can delete
         // that at any time since we only refer to the Pointer to the session kernel.
-        Pointer<ActiveMQSessionKernel> session(new ActiveMQSessionKernel(
-            this, getNextSessionId(), ackMode, *this->config->properties));
+        Pointer<ActiveMQSessionKernel> session(new ActiveMQSessionKernel(this, getNextSessionId(), ackMode, *this->config->properties));
 
         session->setMessageTransformer(this->config->transformer);
 
@@ -570,7 +562,7 @@ std::string ActiveMQConnection::getClien
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setClientID( const std::string& clientID ) {
+void ActiveMQConnection::setClientID(const std::string& clientID) {
 
     if (this->closed.get()) {
         throw cms::IllegalStateException("Connection is already closed", NULL);
@@ -666,7 +658,7 @@ void ActiveMQConnection::close() {
 
             this->config->activeSessions.clear();
             this->config->sessionsLock.writeLock().unlock();
-        } catch(Exception& error) {
+        } catch (Exception& error) {
             this->config->sessionsLock.writeLock().unlock();
             if (!hasException) {
                 ex = error;
@@ -677,8 +669,7 @@ void ActiveMQConnection::close() {
 
         // As TemporaryQueue and TemporaryTopic instances are bound to a connection
         // we should just delete them after the connection is closed to free up memory
-        Pointer< Iterator< Pointer< ActiveMQTempDestination> > > iterator(
-            this->config->activeTempDestinations.values().iterator());
+        Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(this->config->activeTempDestinations.values().iterator());
 
         try {
             while (iterator->hasNext()) {
@@ -761,14 +752,14 @@ void ActiveMQConnection::cleanup() {
         try {
             // We need to use a copy since we aren't able to use CopyOnWriteArrayList
             ArrayList<Pointer<ActiveMQSessionKernel> > sessions(this->config->activeSessions);
-            std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator());
+            std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator());
 
             // Dispose of all the Session resources we know are still open.
             while (iter->hasNext()) {
                 Pointer<ActiveMQSessionKernel> session = iter->next();
-                try{
+                try {
                     session->dispose();
-                } catch( cms::CMSException& ex ){
+                } catch (cms::CMSException& ex) {
                     /* Absorb */
                 }
             }
@@ -813,7 +804,7 @@ void ActiveMQConnection::start() {
                 this->config->sessionsLock.readLock().lock();
 
                 // Start all the sessions.
-                std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+                std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
                 while (iter->hasNext()) {
                     iter->next()->start();
                 }
@@ -840,7 +831,7 @@ void ActiveMQConnection::stop() {
             // new messages.
             if (this->started.compareAndSet(true, false)) {
                 this->config->sessionsLock.readLock().lock();
-                std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+                std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
 
                 while (iter->hasNext()) {
                     iter->next()->stop();
@@ -916,9 +907,9 @@ void ActiveMQConnection::disconnect(long
             throw e;
         }
     }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -926,7 +917,7 @@ void ActiveMQConnection::sendPullRequest
 
     try {
 
-         if (consumer->getPrefetchSize() == 0) {
+        if (consumer->getPrefetchSize() == 0) {
 
             Pointer<MessagePull> messagePull(new MessagePull());
             messagePull->setConsumerId(consumer->getConsumerId());
@@ -936,9 +927,9 @@ void ActiveMQConnection::sendPullRequest
             this->oneway(messagePull);
         }
     }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -957,16 +948,16 @@ void ActiveMQConnection::destroyDestinat
 
         command->setConnectionId(this->config->connectionInfo->getConnectionId());
         command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
-        command->setDestination(Pointer<ActiveMQDestination> (destination->cloneDataStructure()));
+        command->setDestination(Pointer<ActiveMQDestination>(destination->cloneDataStructure()));
 
         // Send the message to the broker.
         syncRequest(command);
     }
-    AMQ_CATCH_RETHROW( NullPointerException )
-    AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_RETHROW(NullPointerException)
+    AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -981,16 +972,15 @@ void ActiveMQConnection::destroyDestinat
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        const ActiveMQDestination* amqDestination =
-            dynamic_cast<const ActiveMQDestination*> (destination);
+        const ActiveMQDestination* amqDestination = dynamic_cast<const ActiveMQDestination*>(destination);
 
         this->destroyDestination(amqDestination);
     }
-    AMQ_CATCH_RETHROW( NullPointerException )
-    AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_RETHROW(NullPointerException)
+    AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1031,7 +1021,7 @@ void ActiveMQConnection::onCommand(const
 
         } else if (command->isProducerAck()) {
 
-            ProducerAck* producerAck = dynamic_cast<ProducerAck*>( command.get() );
+            ProducerAck* producerAck = dynamic_cast<ProducerAck*>(command.get());
 
             // Get the consumer info object for this consumer.
             Pointer<ActiveMQProducerKernel> producer;
@@ -1061,17 +1051,18 @@ void ActiveMQConnection::onCommand(const
         }
 
         synchronized(&this->config->transportListeners) {
-            Pointer< Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
+            Pointer<Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
             while (iter->hasNext()) {
-                try{
+                try {
                     iter->next()->onCommand(command);
-                } catch(...) {}
+                } catch (...) {
+                }
             }
         }
     }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1092,7 +1083,7 @@ void ActiveMQConnection::onConsumerContr
     this->config->sessionsLock.readLock().lock();
     try {
         // Get the complete list of active sessions.
-        std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
+        std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
 
         while (iter->hasNext()) {
             Pointer<ActiveMQSessionKernel> session = iter->next();
@@ -1110,7 +1101,7 @@ void ActiveMQConnection::onConsumerContr
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onException( const decaf::lang::Exception& ex ) {
+void ActiveMQConnection::onException(const decaf::lang::Exception& ex) {
 
     try {
 
@@ -1121,8 +1112,8 @@ void ActiveMQConnection::onException( co
             this->config->executor->execute(new OnExceptionRunnable(this, config, ex.clone()));
         }
     }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1139,7 +1130,7 @@ void ActiveMQConnection::onAsyncExceptio
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::onClientInternalException(const decaf::lang::Exception& ex) {
 
-    if ( !closed.get() && !closing.get() ) {
+    if (!closed.get() && !closing.get()) {
 
         if (this->config->exceptionListener != NULL) {
             this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex));
@@ -1152,12 +1143,13 @@ void ActiveMQConnection::onClientInterna
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::transportInterrupted() {
 
-    this->config->transportInterruptionProcessingComplete.reset(
-        new CountDownLatch( (int)this->config->dispatchers.size() ) );
+    int consumers = this->config->watchTopicAdvisories ? (int) this->config->dispatchers.size() - 1 : (int) this->config->dispatchers.size();
+
+    this->config->transportInterruptionProcessingComplete.reset(new CountDownLatch(consumers));
 
     this->config->sessionsLock.readLock().lock();
     try {
-        std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
+        std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
         while (sessions->hasNext()) {
             sessions->next()->clearMessagesInProgress();
         }
@@ -1168,11 +1160,12 @@ void ActiveMQConnection::transportInterr
     }
 
     synchronized(&this->config->transportListeners) {
-        Pointer< Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator());
+        Pointer<Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator());
         while (listeners->hasNext()) {
-            try{
+            try {
                 listeners->next()->transportInterrupted();
-            } catch(...) {}
+            } catch (...) {
+            }
         }
     }
 }
@@ -1181,11 +1174,12 @@ void ActiveMQConnection::transportInterr
 void ActiveMQConnection::transportResumed() {
 
     synchronized(&this->config->transportListeners) {
-        Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() );
-        while( iter->hasNext() ) {
-            try{
+        Pointer<Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
+        while (iter->hasNext()) {
+            try {
                 iter->next()->transportResumed();
-            } catch(...) {}
+            } catch (...) {
+            }
         }
     }
 }
@@ -1197,10 +1191,10 @@ void ActiveMQConnection::oneway(Pointer<
         checkClosedOrFailed();
         this->config->transport->oneway(command);
     }
-    AMQ_CATCH_EXCEPTION_CONVERT( IOException, ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1218,8 +1212,7 @@ Pointer<Response> ActiveMQConnection::sy
             response = this->config->transport->request(command, timeout);
         }
 
-        commands::ExceptionResponse* exceptionResponse =
-            dynamic_cast<ExceptionResponse*> (response.get());
+        commands::ExceptionResponse* exceptionResponse = dynamic_cast<ExceptionResponse*>(response.get());
 
         if (exceptionResponse != NULL) {
 
@@ -1267,9 +1260,7 @@ void ActiveMQConnection::asyncRequest(Po
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::checkClosed() const {
     if (this->isClosed()) {
-        throw ActiveMQException(
-            __FILE__, __LINE__,
-            "ActiveMQConnection::enforceConnected - Connection has already been closed!" );
+        throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConnection::enforceConnected - Connection has already been closed!");
     }
 }
 
@@ -1278,7 +1269,7 @@ void ActiveMQConnection::checkClosedOrFa
 
     checkClosed();
     if (this->transportFailed.get() == true) {
-        throw ConnectionFailedException( *this->config->firstFailureError );
+        throw ConnectionFailedException(*this->config->firstFailureError);
     }
 }
 
@@ -1316,9 +1307,9 @@ void ActiveMQConnection::ensureConnectio
             }
         }
     }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1405,19 +1396,17 @@ void ActiveMQConnection::signalInterrupt
     if (cdl->getCount() == 0) {
 
         this->config->transportInterruptionProcessingComplete.reset(NULL);
-        FailoverTransport* failoverTransport =
-            dynamic_cast<FailoverTransport*>(this->config->transport->narrow(typeid(FailoverTransport)));
+        FailoverTransport* failoverTransport = dynamic_cast<FailoverTransport*>(this->config->transport->narrow(typeid(FailoverTransport)));
 
         if (failoverTransport != NULL) {
-            failoverTransport->setConnectionInterruptProcessingComplete(
-                this->config->connectionInfo->getConnectionId());
+            failoverTransport->setConnectionInterruptProcessingComplete(this->config->connectionInfo->getConnectionId());
         }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::setUsername(const std::string& username) {
-    this->config->connectionInfo->setUserName( username );
+    this->config->connectionInfo->setUserName(username);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1427,7 +1416,7 @@ const std::string& ActiveMQConnection::g
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::setPassword(const std::string& password) {
-    this->config->connectionInfo->setPassword( password );
+    this->config->connectionInfo->setPassword(password);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1491,7 +1480,7 @@ bool ActiveMQConnection::isDispatchAsync
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setDispatchAsync( bool value ) {
+void ActiveMQConnection::setDispatchAsync(bool value) {
     this->config->dispatchAsync = value;
 }
 
@@ -1675,7 +1664,7 @@ void ActiveMQConnection::deleteTempDesti
 
         this->config->sessionsLock.readLock().lock();
         try {
-            Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
+            Pointer<Iterator<Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
             while (iterator->hasNext()) {
                 Pointer<ActiveMQSessionKernel> session = iterator->next();
                 if (session->isInUse(destination)) {
@@ -1695,16 +1684,16 @@ void ActiveMQConnection::deleteTempDesti
 
         command->setConnectionId(this->config->connectionInfo->getConnectionId());
         command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
-        command->setDestination(Pointer<ActiveMQDestination> (destination->cloneDataStructure()));
+        command->setDestination(Pointer<ActiveMQDestination>(destination->cloneDataStructure()));
 
         // Send the message to the broker.
         syncRequest(command);
     }
-    AMQ_CATCH_RETHROW( NullPointerException )
-    AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+    AMQ_CATCH_RETHROW(NullPointerException)
+    AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1714,8 +1703,7 @@ void ActiveMQConnection::cleanUpTempDest
         return;
     }
 
-    Pointer< Iterator< Pointer< ActiveMQTempDestination> > > iterator(
-        this->config->activeTempDestinations.values().iterator());
+    Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(this->config->activeTempDestinations.values().iterator());
     while (iterator->hasNext()) {
         Pointer<ActiveMQTempDestination> dest = iterator->next();
 
@@ -1724,13 +1712,13 @@ void ActiveMQConnection::cleanUpTempDest
             // Only delete this temporary destination if it was created from this connection, since the
             // advisory consumer tracks all temporary destinations there can be others in our mapping that
             // this connection did not create.
-            std::string thisConnectionId = this->config->connectionInfo->getConnectionId() != NULL ?
-                this->config->connectionInfo->getConnectionId()->toString() : "";
+            std::string thisConnectionId =
+                    this->config->connectionInfo->getConnectionId() != NULL ? this->config->connectionInfo->getConnectionId()->toString() : "";
             if (dest->getConnectionId() == thisConnectionId) {
                 this->deleteTempDestination(dest);
             }
 
-        } catch(Exception& ex) {
+        } catch (Exception& ex) {
         }
     }
 }