You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/07 17:20:52 UTC

svn commit: r1347671 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConnection.cpp kernels/ActiveMQSessionKernel.cpp

Author: tabish
Date: Thu Jun  7 15:20:52 2012
New Revision: 1347671

URL: http://svn.apache.org/viewvc?rev=1347671&view=rev
Log:
Switch back to using the CopyOnWriteArrayList now that its using the ReentrantReadWriteLock

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1347671&r1=1347670&r2=1347671&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Jun  7 15:20:52 2012
@@ -160,7 +160,7 @@ namespace core{
         DispatcherMap dispatchers;
         ProducerMap activeProducers;
 
-        decaf::util::LinkedList< Pointer<ActiveMQSessionKernel> > activeSessions;
+        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQSessionKernel> > activeSessions;
         decaf::util::LinkedList<transport::TransportListener*> transportListeners;
 
         TempDestinationMap activeTempDestinations;
@@ -531,24 +531,20 @@ void ActiveMQConnection::close() {
             AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
         }
 
-        // Get the complete list of active sessions and call dispose() which should not trigger
-        // any messages back to the broker, remove each before the dispose call to avoid a
-        // concurrent modification of the list.
+        // Get the complete list of active sessions.
+        std::auto_ptr< Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+
         long long lastDeliveredSequenceId = 0;
-        synchronized(&this->config->activeSessions) {
-            std::auto_ptr< Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
 
-            // Dispose of all the Session resources we know are still open.
-            while (iter->hasNext()) {
-                Pointer<ActiveMQSessionKernel> session = iter->next();
-                iter->remove();
-                try{
-                    session->dispose();
-                    lastDeliveredSequenceId =
-                        Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
-                } catch( cms::CMSException& ex ){
-                    /* Absorb */
-                }
+        // Dispose of all the Session resources we know are still open.
+        while (iter->hasNext()) {
+            Pointer<ActiveMQSessionKernel> session = iter->next();
+            try{
+                session->dispose();
+                lastDeliveredSequenceId =
+                    Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
+            } catch( cms::CMSException& ex ){
+                /* Absorb */
             }
         }
 
@@ -585,15 +581,16 @@ void ActiveMQConnection::cleanup() {
 
     try {
 
-        synchronized(&this->config->activeSessions) {
-            std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
-            while (iter->hasNext()) {
-                Pointer<ActiveMQSessionKernel> session = iter->next();
-                iter->remove();
-                try{
-                    session->dispose();
-                } catch( cms::CMSException& ex ){
-                }
+        // Get the complete list of active sessions.
+        std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
+
+        // Dispose of all the Session resources we know are still open.
+        while (iter->hasNext()) {
+            Pointer<ActiveMQSessionKernel> session = iter->next();
+            try{
+                session->dispose();
+            } catch( cms::CMSException& ex ){
+                /* Absorb */
             }
         }
 
@@ -905,16 +902,15 @@ void ActiveMQConnection::onConsumerContr
 
     Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>();
 
-    synchronized(&this->config->activeSessions) {
-        std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+    // Get the complete list of active sessions.
+    std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
 
-        while (iter->hasNext()) {
-            Pointer<ActiveMQSessionKernel> session = iter->next();
-            if (consumerControl->isClose()) {
-                session->close(consumerControl->getConsumerId());
-            } else {
-                session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
-            }
+    while (iter->hasNext()) {
+        Pointer<ActiveMQSessionKernel> session = iter->next();
+        if (consumerControl->isClose()) {
+            session->close(consumerControl->getConsumerId());
+        } else {
+            session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
         }
     }
 }
@@ -1454,13 +1450,11 @@ void ActiveMQConnection::deleteTempDesti
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        synchronized(&this->config->activeSessions) {
-            Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
-            while (iterator->hasNext()) {
-                Pointer<ActiveMQSessionKernel> session = iterator->next();
-                if (session->isInUse(destination)) {
-                    throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination");
-                }
+        Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
+        while (iterator->hasNext()) {
+            Pointer<ActiveMQSessionKernel> session = iterator->next();
+            if (session->isInUse(destination)) {
+                throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination");
             }
         }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1347671&r1=1347670&r2=1347671&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Thu Jun  7 15:20:52 2012
@@ -56,7 +56,6 @@
 #include <decaf/lang/Long.h>
 #include <decaf/lang/Math.h>
 #include <decaf/util/Queue.h>
-#include <decaf/util/LinkedList.h>
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
@@ -98,7 +97,7 @@ namespace kernels{
     public:
 
         AtomicBoolean synchronizationRegistered;
-        decaf::util::LinkedList< Pointer<ActiveMQProducerKernel> > producers;
+        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
         Pointer<Scheduler> scheduler;
         Pointer<CloseSynhcronization> closeSync;
         ConsumersMap consumers;
@@ -363,16 +362,13 @@ void ActiveMQSessionKernel::dispose() {
         }
 
         // Dispose of all Producers, the dispose method skips the RemoveInfo command.
-        synchronized(&this->config->producers) {
-            std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+        std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
 
-            while (producerIter->hasNext()) {
-                Pointer<ActiveMQProducerKernel> producer = producerIter->next();
-                producerIter->remove();
-                try {
-                    producer->dispose();
-                } catch (cms::CMSException& ex) {
-                }
+        while (producerIter->hasNext()) {
+            try{
+                producerIter->next()->dispose();
+            } catch (cms::CMSException& ex) {
+                /* Absorb */
             }
         }
     }
@@ -1189,9 +1185,7 @@ void ActiveMQSessionKernel::addProducer(
 
     try {
         this->checkClosed();
-        synchronized(&this->config->producers) {
-            this->config->producers.add(producer);
-        }
+        this->config->producers.add(producer);
         this->connection->addProducer(producer);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
@@ -1204,9 +1198,7 @@ void ActiveMQSessionKernel::removeProduc
 
     try {
         this->connection->removeProducer(producer->getProducerId());
-        synchronized(&this->config->producers) {
-            this->config->producers.remove(producer);
-        }
+        this->config->producers.remove(producer);
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -1216,13 +1208,12 @@ void ActiveMQSessionKernel::removeProduc
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<ActiveMQProducerKernel> ActiveMQSessionKernel::lookupProducerKernel(Pointer<ProducerId> id) {
 
-    synchronized(&this->config->producers) {
-        std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
-        while (producerIter->hasNext()) {
-            Pointer<ActiveMQProducerKernel> producer = producerIter->next();
-            if (producer->getProducerId()->equals(*id)) {
-                return producer;
-            }
+    std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+
+    while (producerIter->hasNext()) {
+        Pointer<ActiveMQProducerKernel> producer = producerIter->next();
+        if (producer->getProducerId()->equals(*id)) {
+            return producer;
         }
     }