You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/07 17:20:52 UTC
svn commit: r1347671 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core:
ActiveMQConnection.cpp kernels/ActiveMQSessionKernel.cpp
Author: tabish
Date: Thu Jun 7 15:20:52 2012
New Revision: 1347671
URL: http://svn.apache.org/viewvc?rev=1347671&view=rev
Log:
Switch back to using the CopyOnWriteArrayList now that its using the ReentrantReadWriteLock
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1347671&r1=1347670&r2=1347671&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Jun 7 15:20:52 2012
@@ -160,7 +160,7 @@ namespace core{
DispatcherMap dispatchers;
ProducerMap activeProducers;
- decaf::util::LinkedList< Pointer<ActiveMQSessionKernel> > activeSessions;
+ decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQSessionKernel> > activeSessions;
decaf::util::LinkedList<transport::TransportListener*> transportListeners;
TempDestinationMap activeTempDestinations;
@@ -531,24 +531,20 @@ void ActiveMQConnection::close() {
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
- // Get the complete list of active sessions and call dispose() which should not trigger
- // any messages back to the broker, remove each before the dispose call to avoid a
- // concurrent modification of the list.
+ // Get the complete list of active sessions.
+ std::auto_ptr< Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+
long long lastDeliveredSequenceId = 0;
- synchronized(&this->config->activeSessions) {
- std::auto_ptr< Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
- // Dispose of all the Session resources we know are still open.
- while (iter->hasNext()) {
- Pointer<ActiveMQSessionKernel> session = iter->next();
- iter->remove();
- try{
- session->dispose();
- lastDeliveredSequenceId =
- Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
- } catch( cms::CMSException& ex ){
- /* Absorb */
- }
+ // Dispose of all the Session resources we know are still open.
+ while (iter->hasNext()) {
+ Pointer<ActiveMQSessionKernel> session = iter->next();
+ try{
+ session->dispose();
+ lastDeliveredSequenceId =
+ Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
+ } catch( cms::CMSException& ex ){
+ /* Absorb */
}
}
@@ -585,15 +581,16 @@ void ActiveMQConnection::cleanup() {
try {
- synchronized(&this->config->activeSessions) {
- std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
- while (iter->hasNext()) {
- Pointer<ActiveMQSessionKernel> session = iter->next();
- iter->remove();
- try{
- session->dispose();
- } catch( cms::CMSException& ex ){
- }
+ // Get the complete list of active sessions.
+ std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
+
+ // Dispose of all the Session resources we know are still open.
+ while (iter->hasNext()) {
+ Pointer<ActiveMQSessionKernel> session = iter->next();
+ try{
+ session->dispose();
+ } catch( cms::CMSException& ex ){
+ /* Absorb */
}
}
@@ -905,16 +902,15 @@ void ActiveMQConnection::onConsumerContr
Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>();
- synchronized(&this->config->activeSessions) {
- std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+ // Get the complete list of active sessions.
+ std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
- while (iter->hasNext()) {
- Pointer<ActiveMQSessionKernel> session = iter->next();
- if (consumerControl->isClose()) {
- session->close(consumerControl->getConsumerId());
- } else {
- session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
- }
+ while (iter->hasNext()) {
+ Pointer<ActiveMQSessionKernel> session = iter->next();
+ if (consumerControl->isClose()) {
+ session->close(consumerControl->getConsumerId());
+ } else {
+ session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
}
}
}
@@ -1454,13 +1450,11 @@ void ActiveMQConnection::deleteTempDesti
checkClosedOrFailed();
ensureConnectionInfoSent();
- synchronized(&this->config->activeSessions) {
- Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
- while (iterator->hasNext()) {
- Pointer<ActiveMQSessionKernel> session = iterator->next();
- if (session->isInUse(destination)) {
- throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination");
- }
+ Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
+ while (iterator->hasNext()) {
+ Pointer<ActiveMQSessionKernel> session = iterator->next();
+ if (session->isInUse(destination)) {
+ throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination");
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1347671&r1=1347670&r2=1347671&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Thu Jun 7 15:20:52 2012
@@ -56,7 +56,6 @@
#include <decaf/lang/Long.h>
#include <decaf/lang/Math.h>
#include <decaf/util/Queue.h>
-#include <decaf/util/LinkedList.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/exceptions/InvalidStateException.h>
@@ -98,7 +97,7 @@ namespace kernels{
public:
AtomicBoolean synchronizationRegistered;
- decaf::util::LinkedList< Pointer<ActiveMQProducerKernel> > producers;
+ decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
Pointer<Scheduler> scheduler;
Pointer<CloseSynhcronization> closeSync;
ConsumersMap consumers;
@@ -363,16 +362,13 @@ void ActiveMQSessionKernel::dispose() {
}
// Dispose of all Producers, the dispose method skips the RemoveInfo command.
- synchronized(&this->config->producers) {
- std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+ std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
- while (producerIter->hasNext()) {
- Pointer<ActiveMQProducerKernel> producer = producerIter->next();
- producerIter->remove();
- try {
- producer->dispose();
- } catch (cms::CMSException& ex) {
- }
+ while (producerIter->hasNext()) {
+ try{
+ producerIter->next()->dispose();
+ } catch (cms::CMSException& ex) {
+ /* Absorb */
}
}
}
@@ -1189,9 +1185,7 @@ void ActiveMQSessionKernel::addProducer(
try {
this->checkClosed();
- synchronized(&this->config->producers) {
- this->config->producers.add(producer);
- }
+ this->config->producers.add(producer);
this->connection->addProducer(producer);
}
AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
@@ -1204,9 +1198,7 @@ void ActiveMQSessionKernel::removeProduc
try {
this->connection->removeProducer(producer->getProducerId());
- synchronized(&this->config->producers) {
- this->config->producers.remove(producer);
- }
+ this->config->producers.remove(producer);
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -1216,13 +1208,12 @@ void ActiveMQSessionKernel::removeProduc
////////////////////////////////////////////////////////////////////////////////
Pointer<ActiveMQProducerKernel> ActiveMQSessionKernel::lookupProducerKernel(Pointer<ProducerId> id) {
- synchronized(&this->config->producers) {
- std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
- while (producerIter->hasNext()) {
- Pointer<ActiveMQProducerKernel> producer = producerIter->next();
- if (producer->getProducerId()->equals(*id)) {
- return producer;
- }
+ std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+
+ while (producerIter->hasNext()) {
+ Pointer<ActiveMQProducerKernel> producer = producerIter->next();
+ if (producer->getProducerId()->equals(*id)) {
+ return producer;
}
}