You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/25 16:55:03 UTC
svn commit: r522273 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core:
ActiveMQSession.cpp ActiveMQSessionExecutor.cpp ActiveMQSessionExecutor.h
Author: nmittler
Date: Sun Mar 25 07:55:02 2007
New Revision: 522273
URL: http://svn.apache.org/viewvc?view=rev&rev=522273
Log:
AMQCPP-83 - updates to purge messages from session executor for closing connection
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=522273&r1=522272&r2=522273
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sun Mar 25 07:55:02 2007
@@ -722,6 +722,7 @@
if( consumer != NULL )
{
+ // If the executor thread is currently running, stop it.
bool wasStarted = isStarted();
if( wasStarted ) {
stop();
@@ -736,12 +737,35 @@
transaction->removeFromTransaction(
consumer->getConsumerId() );
}
-
- // Remove this consumer from the consumers map.
+
+ ActiveMQConsumer* obj = NULL;
synchronized( &consumers ) {
- consumers.remove( consumer->getConsumerId() );
+
+ if( consumers.containsKey( consumer->getConsumerId() ) ) {
+
+ // Get the consumer reference
+ obj = consumers.getValue( consumer->getConsumerId() );
+
+ // Remove this consumer from the map.
+ consumers.remove( consumer->getConsumerId() );
+ }
+ }
+
+ // Clean up any resources in the executor for this consumer
+ if( obj != NULL && executor != NULL ) {
+
+ // Purge any pending messages for this consumer.
+ vector<ActiveMQMessage*> messages =
+ executor->purgeConsumerMessages(obj);
+
+ // Destroy the messages.
+ for( unsigned int ix=0; ix<messages.size(); ++ix ) {
+ delete messages[ix];
+ }
}
+ // If the executor thread was previously running, start it back
+ // up.
if( wasStarted ) {
start();
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?view=diff&rev=522273&r1=522272&r2=522273
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Sun Mar 25 07:55:02 2007
@@ -21,6 +21,7 @@
#include "ActiveMQConsumer.h"
#include <activemq/connector/ConsumerInfo.h>
+using namespace std;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
@@ -31,6 +32,7 @@
ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) {
this->session = session;
+ this->closed = false;
this->started = false;
this->thread = NULL;
}
@@ -40,9 +42,9 @@
try {
- // Stop the thread if it's running.
- stop();
-
+ // Terminate the thread.
+ close();
+
// Empty the message queue and destroy any remaining messages.
clear();
}
@@ -50,12 +52,28 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::close() {
+
+ synchronized( &mutex ) {
+
+ closed = true;
+ mutex.notifyAll();
+ }
+
+ if( thread != NULL ) {
+ thread->join();
+ delete thread;
+ thread = NULL;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::execute( DispatchData& data ) {
// Add the data to the queue.
- synchronized( &messageQueue ) {
- messageQueue.push( data );
- wakeup();
+ synchronized( &mutex ) {
+ messageQueue.push_back( data );
+ mutex.notifyAll();
}
}
@@ -63,16 +81,47 @@
void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
// Add the data to the front of the queue.
- synchronized( &messageQueue ) {
- messageQueue.enqueueFront( data );
- wakeup();
+ synchronized( &mutex ) {
+ messageQueue.push_front( data );
+ mutex.notifyAll();
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::start() {
+vector<ActiveMQMessage*> ActiveMQSessionExecutor::purgeConsumerMessages(
+ ActiveMQConsumer* consumer )
+{
+ vector<ActiveMQMessage*> retVal;
+
+ const connector::ConsumerInfo* consumerInfo = consumer->getConsumerInfo();
+
+ synchronized( &mutex ) {
+
+ list<DispatchData>::iterator iter = messageQueue.begin();
+ while( iter != messageQueue.end() ) {
+ list<DispatchData>::iterator currentIter = iter;
+ DispatchData& dispatchData = *iter++;
+ if( consumerInfo == dispatchData.getConsumer() ||
+ consumerInfo->getConsumerId() == dispatchData.getConsumer()->getConsumerId() )
+ {
+ retVal.push_back( dispatchData.getMessage() );
+ messageQueue.erase(currentIter);
+ }
+ }
+ }
+
+ return retVal;
+}
- synchronized( &messageQueue ) {
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::start() {
+
+ synchronized( &mutex ) {
+
+ if( closed || started ) {
+ return;
+ }
+
started = true;
// Don't create the thread unless we need to.
@@ -81,37 +130,41 @@
thread->start();
}
- wakeup();
+ mutex.notifyAll();
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::stop() {
+
+ synchronized( &mutex ) {
+
+ if( closed || !started ) {
+ return;
+ }
- synchronized( &messageQueue ) {
-
+ // Set the state to stopped.
started = false;
- wakeup();
- }
-
- if( thread != NULL ) {
- thread->join();
- delete thread;
- thread = NULL;
+
+ // Wakeup the thread so that it can acknowledge the stop request.
+ mutex.notifyAll();
+
+ // Wait for the thread to notify us that it has acknowledged
+ // the stop request.
+ mutex.wait();
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::clear() {
- synchronized( &messageQueue ) {
+ synchronized( &mutex ) {
- while( !messageQueue.empty() ) {
- DispatchData data = messageQueue.pop();
+ list<DispatchData>::iterator iter = messageQueue.begin();
+ while( iter != messageQueue.end() ) {
+ DispatchData data = *iter++;
delete data.getMessage();
}
-
- wakeup();
}
}
@@ -138,7 +191,7 @@
} catch( ActiveMQException& ex ) {
ex.setMark(__FILE__, __LINE__ );
ex.printStackTrace();
- } catch( std::exception& ex ) {
+ } catch( exception& ex ) {
ActiveMQException amqex( __FILE__, __LINE__, ex.what() );
amqex.printStackTrace();
} catch( ... ) {
@@ -152,17 +205,29 @@
try {
- while( started ) {
+ while( true ) {
// Dispatch all currently available messages.
dispatchAll();
- synchronized( &messageQueue ) {
+ synchronized( &mutex ) {
+
+ // If we're closing down, exit the thread.
+ if( closed ) {
+ return;
+ }
+
+ // When told to stop, the calling thread will wait for a
+ // responding notification, indicating that we have acknowledged
+ // the stop command.
+ if( !started ) {
+ mutex.notifyAll();
+ }
- if( messageQueue.empty() && started ) {
+ if( messageQueue.empty() || !started ) {
// Wait for more data or to be woken up.
- messageQueue.wait();
+ mutex.wait();
}
}
}
@@ -170,7 +235,7 @@
} catch( ActiveMQException& ex ) {
ex.setMark(__FILE__, __LINE__ );
session->fire( ex );
- } catch( std::exception& stdex ) {
+ } catch( exception& stdex ) {
ActiveMQException ex( __FILE__, __LINE__, stdex.what() );
session->fire( ex );
} catch( ... ) {
@@ -181,27 +246,32 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::dispatchAll() {
-
+
// Take out all of the dispatch data currently in the array.
- std::vector<DispatchData> dataList;
- synchronized( &messageQueue ) {
+ list<DispatchData> dataList;
+ synchronized( &mutex ) {
+
+ // When told to stop, the calling thread will wait for a
+ // responding notification, indicating that we have acknowledged
+ // the stop command.
+ if( !started ) {
+ mutex.notifyAll();
+ }
+
+ if( !started || closed ) {
+ return;
+ }
- dataList = messageQueue.toArray();
+ dataList = messageQueue;
messageQueue.clear();
}
-
+
// Dispatch all currently available messages.
- for( unsigned int ix=0; ix<dataList.size(); ++ix ) {
- DispatchData& data = dataList[ix];
+ list<DispatchData>::iterator iter = dataList.begin();
+ while( iter != dataList.end() ) {
+ DispatchData& data = *iter++;
dispatch( data );
}
}
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::wakeup() {
-
- synchronized( &messageQueue ) {
- messageQueue.notifyAll();
- }
-}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h?view=diff&rev=522273&r1=522272&r2=522273
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h Sun Mar 25 07:55:02 2007
@@ -21,12 +21,15 @@
#include <activemq/core/Dispatcher.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/concurrent/Thread.h>
-#include <activemq/util/Queue.h>
+#include <activemq/concurrent/Mutex.h>
+#include <vector>
+#include <list>
namespace activemq{
namespace core{
class ActiveMQSession;
+ class ActiveMQConsumer;
/**
* Delegate dispatcher for a single session. Contains a thread
@@ -39,9 +42,11 @@
private:
ActiveMQSession* session;
- util::Queue<DispatchData> messageQueue;
- bool started;
+ std::list<DispatchData> messageQueue;
concurrent::Thread* thread;
+ concurrent::Mutex mutex;
+ bool started;
+ bool closed;
public:
@@ -70,6 +75,14 @@
virtual void executeFirst( DispatchData& data );
/**
+ * Removes all messages for the given consumer from the queue and
+ * returns them.
+ * @param consumer the subject consmer
+ * @return all messages that were queued for the consumer.
+ */
+ virtual std::vector<ActiveMQMessage*> purgeConsumerMessages( ActiveMQConsumer* consumer );
+
+ /**
* Starts the dispatching.
*/
virtual void start();
@@ -80,6 +93,12 @@
virtual void stop();
/**
+ * Terminates the dispatching thread. Once this is called, the executor is no longer
+ * usable.
+ */
+ virtual void close();
+
+ /**
* Indicates if the executor is started
*/
virtual bool isStarted() const {
@@ -90,13 +109,6 @@
* Removes all queued messgaes and destroys them.
*/
virtual void clear();
-
- /**
- * Depending on whether or not the session is async,
- * notifies the thread or simply dispatches all available
- * messages synchronously.
- */
- virtual void wakeup();
/**
* Dispatches a message to a particular consumer.