You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/18 11:54:19 UTC
svn commit: r519590 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/ main/activemq/core/ main/activemq/util/ main/cms/
test-integration/integration/connector/openwire/
test/activemq/connector/stomp/ test/activemq/core/
Author: nmittler
Date: Sun Mar 18 03:54:18 2007
New Revision: 519590
URL: http://svn.apache.org/viewvc?view=rev&rev=519590
Log:
AMQCPP-90 - moving the async thread to session instead of consumers
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
Removed:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageListener.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Sun Mar 18 03:54:18 2007
@@ -23,6 +23,7 @@
activemq/core/ActiveMQTransaction.cpp \
activemq/core/ActiveMQProducer.cpp \
activemq/core/ActiveMQConnectionFactory.cpp \
+ activemq/core/ActiveMQSessionExecutor.cpp \
activemq/logger/LoggerHierarchy.cpp \
activemq/logger/LogWriter.cpp \
activemq/logger/Logger.cpp \
@@ -96,12 +97,14 @@
activemq/core/ActiveMQDestination.h \
activemq/core/ActiveMQConnection.h \
activemq/core/ActiveMQTransaction.h \
- activemq/core/ActiveMQMessageListener.h \
activemq/core/ActiveMQConnectionFactory.h \
activemq/core/ActiveMQConsumer.h \
activemq/core/ActiveMQSession.h \
activemq/core/ActiveMQAckHandler.h \
activemq/core/ActiveMQConstants.h \
+ activemq/core/ActiveMQSessionExecutor.h \
+ activemq/core/DispatchData.h \
+ activemq/core/Dispatcher.h \
activemq/logger/LoggerCommon.h \
activemq/logger/LogRecord.h \
activemq/logger/SimpleLogger.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Sun Mar 18 03:54:18 2007
@@ -21,10 +21,12 @@
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConsumer.h>
#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/util/Boolean.h>
using namespace cms;
using namespace activemq;
using namespace activemq::core;
+using namespace activemq::util;
using namespace activemq::connector;
using namespace activemq::exceptions;
using namespace std;
@@ -36,6 +38,9 @@
this->started = false;
this->closed = false;
this->exceptionListener = NULL;
+
+ alwaysSessionAsync = Boolean::parseBoolean(
+ connectionData->getProperties().getProperty( "alwaysSessionAsync", "true" ) );
// Register for messages and exceptions from the connector.
Connector* connector = connectionData->getConnector();
@@ -55,34 +60,63 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::Session* ActiveMQConnection::createSession()
- throw ( cms::CMSException )
+void ActiveMQConnection::addDispatcher( connector::ConsumerInfo* consumer,
+ Dispatcher* dispatcher )
+{
+ // Add the consumer to the map.
+ synchronized( &dispatchers )
+ {
+ dispatchers.setValue( consumer->getConsumerId(), dispatcher );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeDispatcher( const connector::ConsumerInfo* consumer ) {
+
+ // Remove the consumer from the map.
+ synchronized( &dispatchers )
+ {
+ dispatchers.remove( consumer->getConsumerId() );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session* ActiveMQConnection::createSession() throw ( cms::CMSException )
{
try
{
- return this->createSession( Session::AUTO_ACKNOWLEDGE );
+ return createSession( Session::AUTO_ACKNOWLEDGE );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
-cms::Session* ActiveMQConnection::createSession(
- cms::Session::AcknowledgeMode ackMode )
- throw ( cms::CMSException )
+cms::Session* ActiveMQConnection::createSession(
+ cms::Session::AcknowledgeMode ackMode ) throw ( cms::CMSException )
{
try
{
+ // Determine whether or not to make dispatch for this session asynchronous
+ bool doSessionAsync = alwaysSessionAsync || !activeSessions.isEmpty() ||
+ ackMode==Session::SESSION_TRANSACTED || ackMode==Session::CLIENT_ACKNOWLEDGE;
+
// Create the session instance.
ActiveMQSession* session = new ActiveMQSession(
connectionData->getConnector()->createSession( ackMode ),
connectionData->getProperties(),
- this );
+ this,
+ doSessionAsync );
// Add the session to the set of active sessions.
synchronized( &activeSessions ) {
activeSessions.add( session );
}
+
+ // If we're already started, start the session.
+ if( started ) {
+ session->start();
+ }
return session;
}
@@ -107,7 +141,7 @@
}
// Get the complete list of active sessions.
- std::vector<cms::Session*> allSessions;
+ std::vector<ActiveMQSession*> allSessions;
synchronized( &activeSessions ) {
allSessions = activeSessions.toArray();
}
@@ -145,6 +179,12 @@
// messages delivered while this connection is stopped are dropped
// and not acknowledged.
started = true;
+
+ // Start all the sessions.
+ std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
+ for( unsigned int ix=0; ix<sessions.size(); ++ix ) {
+ sessions[ix]->start();
+ }
}
////////////////////////////////////////////////////////////////////////////////
@@ -153,26 +193,11 @@
// Once current deliveries are done this stops the delivery of any
// new messages.
started = false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::addMessageListener( long long consumerId,
- ActiveMQMessageListener* listener )
-{
- // Place in Map
- synchronized( &consumers )
- {
- consumers.setValue( consumerId, listener );
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeMessageListener( long long consumerId )
-{
- // Remove from Map
- synchronized( &consumers )
- {
- consumers.remove( consumerId );
+
+ // Start all the sessions.
+ std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
+ for( unsigned int ix=0; ix<sessions.size(); ++ix ) {
+ sessions[ix]->stop();
}
}
@@ -211,16 +236,17 @@
return;
}
- // Started, so lock map and dispatch the message.
- synchronized( &consumers )
+ // Look up the dispatcher.
+ Dispatcher* dispatcher = NULL;
+ synchronized( &dispatchers )
{
- if( consumers.containsKey(consumer->getConsumerId()) )
- {
- ActiveMQMessageListener* listener =
- consumers.getValue(consumer->getConsumerId());
-
- listener->onActiveMQMessage( message );
- }
+ dispatcher = dispatchers.getValue(consumer->getConsumerId());
+ }
+
+ // Dispatch the message.
+ if( dispatcher != NULL ) {
+ DispatchData data( consumer, message );
+ dispatcher->dispatch( data );
}
}
catch( exceptions::ActiveMQException& ex )
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Sun Mar 18 03:54:18 2007
@@ -21,8 +21,8 @@
#include <cms/Connection.h>
#include <cms/ExceptionListener.h>
#include <activemq/core/ActiveMQConnectionData.h>
-#include <activemq/core/ActiveMQMessageListener.h>
#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/core/Dispatcher.h>
#include <activemq/connector/ConsumerMessageListener.h>
#include <activemq/util/Properties.h>
#include <activemq/util/Map.h>
@@ -31,6 +31,11 @@
#include <string>
namespace activemq{
+
+ namespace connector {
+ class ConsumerInfo;
+ }
+
namespace core{
class cms::Session;
@@ -70,14 +75,20 @@
bool closed;
/**
- * Map of Consumer Ids to ActiveMQMessageListeners
+ * Map of message dispatchers indexed by consumer id.
*/
- util::Map< long long, ActiveMQMessageListener* > consumers;
+ util::Map< long long, Dispatcher* > dispatchers;
/**
* Maintain the set of all active sessions.
*/
- util::Set<cms::Session*> activeSessions;
+ util::Set<ActiveMQSession*> activeSessions;
+
+ /**
+ * If true, dispatch for all sessions will be asynchronous to the
+ * transport.
+ */
+ bool alwaysSessionAsync;
public:
@@ -95,6 +106,19 @@
* @param session The session to be unregistered from this connection.
*/
virtual void removeSession( ActiveMQSession* session ) throw ( cms::CMSException );
+
+ /**
+ * Adds a dispatcher for a consumer.
+ * @param consumer - The consumer for which to register a dispatcher.
+ * @param dispatcher - The dispatcher to handle incoming messages for the consumer.
+ */
+ virtual void addDispatcher( connector::ConsumerInfo* consumer, Dispatcher* dispatcher );
+
+ /**
+ * Removes the dispatcher for a consumer.
+ * @param consumer - The consumer for which to remove the dispatcher.
+ */
+ virtual void removeDispatcher( const connector::ConsumerInfo* consumer );
public: // Connection Interface Methods
@@ -161,24 +185,6 @@
* @throws CMSException
*/
virtual void stop() throw ( cms::CMSException );
-
- public: // ActiveMQConnection Methods
-
- /**
- * Adds the ActiveMQMessageListener to the Mapping of Consumer Id's
- * to listeners, all message to that id will be routed to the given
- * listener
- * @param consumerId Consumer Id String
- * @param listener ActiveMQMessageListener Pointer
- */
- virtual void addMessageListener( long long consumerId,
- ActiveMQMessageListener* listener );
-
- /**
- * Remove the Listener for the specified Consumer Id
- * @param consumerId Consumer Id string
- */
- virtual void removeMessageListener( long long consumerId );
public: // ExceptionListener interface methods
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Sun Mar 18 03:54:18 2007
@@ -41,11 +41,10 @@
__FILE__, __LINE__,
"ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session");
}
-
+
// Init Producer Data
this->session = session;
this->consumerInfo = consumerInfo;
- this->listenerThread = NULL;
this->listener = NULL;
this->closed = false;
@@ -67,62 +66,51 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::close()
+void ActiveMQConsumer::close()
throw ( cms::CMSException )
{
try
{
if( !closed ) {
-
+
// Identifies any errors encountered during shutdown.
bool haveException = false;
- ActiveMQException error;
-
+ ActiveMQException error;
+
// Close the ConsumerInfo
if( !consumerInfo->isClosed() ) {
try{
- // We don't want a callback now
- this->consumerInfo->removeListener( this );
- this->consumerInfo->close();
- } catch ( ActiveMQException& ex ){
- if( !haveException ){
- ex.setMark( __FILE__, __LINE__ );
+ // We don't want a callback now
+ this->consumerInfo->removeListener( this );
+ this->consumerInfo->close();
+ } catch( ActiveMQException& ex ){
+ if( !haveException ){
+ ex.setMark( __FILE__, __LINE__ );
error = ex;
haveException = true;
}
}
}
-
+
closed = true;
-
- // Stop the asynchronous message processin thread if it's
- // running.
- try{
- stopThread();
- } catch ( ActiveMQException& ex ){
- if( !haveException ){
- ex.setMark( __FILE__, __LINE__ );
- error = ex;
- haveException = true;
- }
- }
-
+
// Purge all the pending messages
try{
purgeMessages();
} catch ( ActiveMQException& ex ){
- if( !haveException ){
- ex.setMark( __FILE__, __LINE__ );
+ if( !haveException ){
+ ex.setMark( __FILE__, __LINE__ );
error = ex;
haveException = true;
}
}
-
+
// If we encountered an error, propagate it.
if( haveException ){
error.setMark( __FILE__, __LINE__ );
throw error;
}
+
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -130,7 +118,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-std::string ActiveMQConsumer::getMessageSelector() const
+std::string ActiveMQConsumer::getMessageSelector() const
throw ( cms::CMSException )
{
try
@@ -154,15 +142,15 @@
"ActiveMQConsumer::receive - This Consumer is closed" );
}
- synchronized( &msgQueue )
+ synchronized( &unconsumedMessages )
{
// Check for empty in case of spurious wakeup, or race to
// queue lock.
- while( !closed && msgQueue.empty() )
+ while( !closed && unconsumedMessages.empty() )
{
- msgQueue.wait();
+ unconsumedMessages.wait();
}
-
+
// This will only happen when this object is being
// closed in another thread context - kind of
// scary.
@@ -170,15 +158,16 @@
throw ActiveMQException( __FILE__, __LINE__,
"Consumer is being closed in another thread" );
}
-
+
// Fetch the Message then copy it so it can be handed off
// to the user.
- cms::Message* message = msgQueue.pop();
+ DispatchData data = unconsumedMessages.pop();
+ cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
cms::Message* result = message->clone();
// The Message is cleaned up here if the Session is not
// transacted, otherwise we let the transaction clean up
- // this message as it will have already been ack'd and
+ // this message as it will have already been ack'd and
// stored for later redelivery.
destroyMessage( message );
@@ -192,7 +181,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumer::receive( int millisecs )
+cms::Message* ActiveMQConsumer::receive( int millisecs )
throw ( cms::CMSException )
{
try
@@ -204,19 +193,19 @@
"ActiveMQConsumer::receive - This Consumer is closed" );
}
- synchronized( &msgQueue )
+ synchronized( &unconsumedMessages )
{
// Check for empty, and wait if its not
- if( !closed && msgQueue.empty() ){
-
- msgQueue.wait(millisecs);
+ if( !closed && unconsumedMessages.empty() ){
+
+ unconsumedMessages.wait(millisecs);
// if its still empty...bail
- if( msgQueue.empty() ) {
+ if( unconsumedMessages.empty() ) {
return NULL;
}
}
-
+
// This will only happen when this object is being
// closed in another thread context - kind of
// scary.
@@ -224,15 +213,16 @@
throw ActiveMQException( __FILE__, __LINE__,
"Consumer is being closed in another thread" );
}
-
+
// Fetch the Message then copy it so it can be handed off
// to the user.
- cms::Message* message = msgQueue.pop();
+ DispatchData data = unconsumedMessages.pop();
+ cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
cms::Message* result = message->clone();
// The Message is cleaned up here if the Session is not
// transacted, otherwise we let the transaction clean up
- // this message as it will have already been ack'd and
+ // this message as it will have already been ack'd and
// stored for later redelivery.
destroyMessage( message );
@@ -246,7 +236,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumer::receiveNoWait()
+cms::Message* ActiveMQConsumer::receiveNoWait()
throw ( cms::CMSException )
{
try
@@ -258,25 +248,26 @@
"ActiveMQConsumer::receive - This Consumer is closed" );
}
- synchronized( &msgQueue )
+ synchronized( &unconsumedMessages )
{
- if( !msgQueue.empty() )
+ if( !unconsumedMessages.empty() )
{
// Fetch the Message then copy it so it can be handed off
// to the user.
- cms::Message* message = msgQueue.pop();
+ DispatchData data = unconsumedMessages.pop();
+ cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
cms::Message* result = message->clone();
// The Message is cleaned up here if the Session is not
// transacted, otherwise we let the transaction clean up
- // this message as it will have already been ack'd and
+ // this message as it will have already been ack'd and
// stored for later redelivery.
destroyMessage( message );
return result;
}
}
-
+
return NULL;
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -295,16 +286,24 @@
"ActiveMQConsumer::receive - This Consumer is closed" );
}
- synchronized( &listenerLock )
- {
- this->listener = listener;
- }
-
- // Start the thread if it isn't already running.
- // If it is already running, this method will wake the thread up
- // to notify it that there is a message listener, so that it may
- // get rid of backed up messages.
- startThread();
+ this->listener = listener;
+
+ if( listener != NULL && session != NULL ) {
+
+ // Now that we have a valid message listener,
+ // redispatch all the messages that it missed.
+
+ bool wasStarted = session->isStarted();
+ if( wasStarted ) {
+ session->stop();
+ }
+
+ session->redispatch( unconsumedMessages );
+
+ if( wasStarted ) {
+ session->start();
+ }
+ }
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -333,70 +332,13 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::run()
-{
- try
- {
- while( !closed )
- {
- Message* message = NULL;
-
- synchronized( &msgQueue )
- {
-
- // Gaurd against spurious wakeup or race to sync lock
- // also if the listner has been unregistered we don't
- // have anyone to notify, so we wait till a new one is
- // registered, and then we will deliver the backlog
- while( msgQueue.empty() || listener == NULL )
- {
- if( closed )
- {
- break;
- }
- msgQueue.wait();
- }
-
- // don't want to process messages if we are shutting down.
- if( closed )
- {
- return;
- }
-
- // Dispatch the message
- message = msgQueue.pop();
- }
-
- // Notify the listener
- notifyListener( message );
-
- // The Message is cleaned up here if the Session is not
- // transacted, otherwise we let the transaction clean up
- // this message as it will have already been ack'd and
- // stored for later redelivery.
- destroyMessage( message );
- }
- }
- catch( ... )
- {
- cms::ExceptionListener* listener = session->getExceptionListener();
-
- if( listener != NULL )
- {
- listener->onException( ActiveMQException(
- __FILE__, __LINE__,
- "ActiveMQConsumer::run - "
- "MessageListener threw an unknown Exception, recovering..." ) );
- }
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::dispatch( ActiveMQMessage* message )
- throw ( cms::CMSException )
+void ActiveMQConsumer::dispatch( DispatchData& data )
{
try
{
+
+ ActiveMQMessage* message = data.getMessage();
+
// Don't dispatch expired messages, ack it and then destroy it
if( message->isExpired() ) {
session->acknowledge( this, message );
@@ -405,25 +347,32 @@
// stop now, don't queue
return;
}
-
- // If the Session is in ClientAcknowledge mode, then we set the
+
+ // If the Session is in ClientAcknowledge mode, then we set the
// handler in the message to this object and send it out. Otherwise
// we ack it here for all the other Modes.
if( session->getAcknowledgeMode() == Session::CLIENT_ACKNOWLEDGE ) {
-
+
// Register ourself so that we can handle the Message's
// acknowledge method.
message->setAckHandler( this );
-
+
} else {
session->acknowledge( this, message );
}
-
- // No listener, so we queue it
- synchronized( &msgQueue ) {
-
- msgQueue.push( dynamic_cast< cms::Message* >( message ) );
- msgQueue.notifyAll();
+
+ // If we have a listener, send the message.
+ if( listener != NULL ) {
+ cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
+ listener->onMessage( message );
+ destroyMessage( message );
+ } else {
+
+ // No listener, add it to the unconsumed messages list
+ synchronized( &unconsumedMessages ) {
+ unconsumedMessages.push( data );
+ unconsumedMessages.notifyAll();
+ }
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -435,14 +384,14 @@
{
try
{
- synchronized( &msgQueue )
+ synchronized( &unconsumedMessages )
{
- while( !msgQueue.empty() )
+ while( !unconsumedMessages.empty() )
{
// destroy these messages if this is not a transacted
- // session, if it is then the tranasction will clean
+ // session, if it is then the tranasction will clean
// the messages up.
- destroyMessage( msgQueue.pop() );
+ destroyMessage( dynamic_cast<cms::Message*>(unconsumedMessages.pop().getMessage()) );
}
}
}
@@ -451,46 +400,8 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::onActiveMQMessage( ActiveMQMessage* message )
- throw ( ActiveMQException )
-{
- try
- {
- if( message == NULL )
- {
- throw ActiveMQException(
- __FILE__, __LINE__,
- "ActiveMQConsumer::onActiveMQMessage - Passed a Null Message");
- }
-
- this->dispatch( message );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::notifyListener( Message* message ) throw ( ActiveMQException ){
-
- try
- {
- MessageListener* listener = NULL;
- synchronized( &listenerLock )
- {
- listener = getMessageListener();
- }
- if(listener != NULL)
- {
- listener->onMessage( message );
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::destroyMessage( Message* message ) throw ( ActiveMQException ){
-
+
try
{
/**
@@ -501,54 +412,7 @@
{
delete message;
}
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::startThread() throw ( ActiveMQException ) {
-
- try
- {
- // Start the thread, if it's not already started.
- if( listenerThread == NULL )
- {
- listenerThread = new Thread( this );
- listenerThread->start();
- }
-
- // notify the Queue so that any pending messages get delivered
- synchronized( &msgQueue )
- {
- msgQueue.notifyAll();
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::stopThread() throw ( ActiveMQException ) {
-
- try
- {
- // if the thread is running signal it to quit and then
- // wait for run to return so thread can die
- if( listenerThread != NULL )
- {
- synchronized( &msgQueue )
- {
- // Force a wakeup if run is in a wait.
- msgQueue.notifyAll();
- }
-
- // Wait for it to die and then delete it.
- listenerThread->join();
- delete listenerThread;
- listenerThread = NULL;
- }
- }
+ }
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
@@ -580,3 +444,4 @@
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
+
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Sun Mar 18 03:54:18 2007
@@ -26,8 +26,7 @@
#include <activemq/connector/ConnectorResourceListener.h>
#include <activemq/util/Queue.h>
#include <activemq/core/ActiveMQAckHandler.h>
-#include <activemq/core/ActiveMQMessageListener.h>
-#include <activemq/concurrent/Runnable.h>
+#include <activemq/core/Dispatcher.h>
#include <activemq/concurrent/Mutex.h>
namespace activemq{
@@ -35,36 +34,29 @@
class ActiveMQSession;
- class ActiveMQConsumer :
+ class ActiveMQConsumer :
public cms::MessageConsumer,
public ActiveMQAckHandler,
- public concurrent::Runnable,
- public ActiveMQMessageListener,
- public connector::ConnectorResourceListener
+ public Dispatcher,
+ public connector::ConnectorResourceListener
{
private:
-
+
// The session that owns this Consumer
ActiveMQSession* session;
-
+
// The Consumer info for this Consumer
connector::ConsumerInfo* consumerInfo;
-
+
// The Message Listener for this Consumer
cms::MessageListener* listener;
-
- // Lock to protect us from dispatching to a dead listener
- concurrent::Mutex listenerLock;
-
- // Message Queue
- util::Queue<cms::Message*> msgQueue;
-
- // Thread to notif a listener if one is added
- concurrent::Thread* listenerThread;
-
+
+ // Queue of unconsumed messages.
+ util::Queue<DispatchData> unconsumedMessages;
+
// Boolean that indicates if the consumer has been closed
bool closed;
-
+
public:
/**
@@ -85,7 +77,7 @@
* @throws CMSException
*/
virtual void close() throw ( cms::CMSException );
-
+
/**
* Synchronously Receive a Message
* @return new message
@@ -129,9 +121,9 @@
* @return This Consumer's selector expression or "".
* @throws cms::CMSException
*/
- virtual std::string getMessageSelector() const
+ virtual std::string getMessageSelector() const
throw ( cms::CMSException );
-
+
/**
* Method called to acknowledge the message passed
* @param message the Message to Acknowlegde
@@ -140,31 +132,16 @@
virtual void acknowledgeMessage( const ActiveMQMessage* message )
throw ( cms::CMSException );
+ public: // Dispatcher Methods
+
/**
- * Run method that is called from the Thread class when this object
- * is registered with a Thread and started. This function reads from
- * the message queue and dispatches calls to the MessageConsumer that
- * is registered with this class.
- *
- * It is a error for a MessageListener to throw an exception in their
- * onMessage method, but if it does happen this function will get any
- * registered exception listener from the session and notify it.
- */
- virtual void run();
-
- public: // ActiveMQMessageListener Methods
-
- /**
- * Called asynchronously when a new message is received, the message
- * that is passed is now the property of the callee, and the caller
- * will disavowe all knowledge of the message, i.e Callee must delete.
+ * Called asynchronously by the session to dispatch a message.
* @param message object pointer
*/
- virtual void onActiveMQMessage( ActiveMQMessage* message )
- throw ( exceptions::ActiveMQException );
-
+ virtual void dispatch( DispatchData& message );
+
public: // ActiveMQSessionResource
-
+
/**
* Retrieve the Connector resource that is associated with
* this Session resource.
@@ -177,19 +154,8 @@
public: // ActiveMQConsumer Methods
/**
- * Called to dispatch a message to this consumer, this is usually
- * called from the context of another thread. This will enqueue a
- * message on the Consumers Queue, or notify a listener if one is
- * currently registered.
- * @param message cms::Message pointer to the message to dispatch
- * @throws cms::CMSException
- */
- virtual void dispatch( ActiveMQMessage* message )
- throw ( cms::CMSException );
-
- /**
* Get the Consumer information for this consumer
- * @return Pointer to a Consumer Info Object
+ * @return Pointer to a Consumer Info Object
*/
virtual connector::ConsumerInfo* getConsumerInfo() {
return consumerInfo;
@@ -207,40 +173,20 @@
const connector::ConnectorResource* resource ) throw ( cms::CMSException );
protected:
-
+
/**
* Purges all messages currently in the queue. This can be as a
* result of a rollback, or of the consumer being shutdown.
*/
virtual void purgeMessages() throw (exceptions::ActiveMQException);
-
+
/**
* Destroys the message if the session is transacted, otherwise
* does nothing.
* @param message the message to destroy
*/
- virtual void destroyMessage( cms::Message* message )
- throw (exceptions::ActiveMQException);
-
- /**
- * Notifies the listener of a message.
- * @param message the message to pass to the listener
- */
- void notifyListener( cms::Message* message )
+ virtual void destroyMessage( cms::Message* message )
throw (exceptions::ActiveMQException);
-
- /**
- * Starts the message processing thread to receive messages
- * asynchronously. This thread is started when setMessageListener
- * is invoked, which means that the caller is choosing to use this
- * consumer asynchronously instead of synchronously (receive).
- */
- void startThread() throw (exceptions::ActiveMQException);
-
- /**
- * Stops the asynchronous message processing thread if it's started.
- */
- void stopThread() throw (exceptions::ActiveMQException);
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sun Mar 18 03:54:18 2007
@@ -24,6 +24,7 @@
#include <activemq/core/ActiveMQConsumer.h>
#include <activemq/core/ActiveMQMessage.h>
#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/core/ActiveMQSessionExecutor.h>
#include <activemq/util/Boolean.h>
#include <activemq/connector/TransactionInfo.h>
@@ -40,7 +41,8 @@
////////////////////////////////////////////////////////////////////////////////
ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
const Properties& properties,
- ActiveMQConnection* connection)
+ ActiveMQConnection* connection,
+ bool sessionAsyncDispatch )
{
if( sessionInfo == NULL || connection == NULL )
{
@@ -54,6 +56,7 @@
this->connection = connection;
this->closed = false;
this->asyncThread = NULL;
+ this->sessionAsyncDispatch = sessionAsyncDispatch;
this->useAsyncSend = Boolean::parseBoolean(
properties.getProperty( "useAsyncSend", "false" ) );
@@ -69,6 +72,9 @@
transaction =
new ActiveMQTransaction(connection, this, properties );
}
+
+ // Create the session executor object.
+ executor = new ActiveMQSessionExecutor( this );
}
////////////////////////////////////////////////////////////////////////////////
@@ -93,11 +99,14 @@
try
{
+ // Stop the dispatch executor.
+ stop();
+
// Get the complete list of closeable session resources.
std::vector<cms::Closeable*> allResources;
synchronized( &closableSessionResources ) {
allResources = closableSessionResources.toArray();
- }
+ }
// Close all of the resources.
for( unsigned int ix=0; ix<allResources.size(); ++ix ){
@@ -127,6 +136,9 @@
// Remove any unsent cloned messages.
purgeMessages();
+
+ delete executor;
+ executor = NULL;
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
@@ -227,31 +239,38 @@
"ActiveMQSession::createConsumer - Session Already Closed" );
}
- ConsumerInfo* consumerInfo =
+ ConsumerInfo* consumerInfo =
connection->getConnectionData()->getConnector()->
createConsumer( destination,
sessionInfo,
selector,
noLocal );
- // Add to Session Closeables and Monitor for close, if needed.
+ // Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
dynamic_cast<ConnectorResource*>( consumerInfo ) );
- // Create the consumer instance.
+ // Create the consumer instance.
ActiveMQConsumer* consumer = new ActiveMQConsumer(
consumerInfo, this );
- // Register this consumer as a listener of messages from the
- // connection.
- connection->addMessageListener(
- consumer->getConsumerInfo()->getConsumerId(), consumer );
+
+ // Add the consumer to the map.
+ synchronized( &consumers ) {
+ consumers.setValue( consumerInfo->getConsumerId(), consumer );
+ }
+
+ // Register this as a message dispatcher for the consumer.
+ connection->addDispatcher( consumerInfo, this );
// Start the Consumer, we are now ready to receive messages
try{
connection->getConnectionData()->getConnector()->startConsumer(
- consumer->getConsumerInfo() );
+ consumerInfo );
} catch( ActiveMQException& ex ) {
+ synchronized( &consumers ) {
+ consumers.remove( consumerInfo->getConsumerId() );
+ }
delete consumer;
ex.setMark( __FILE__, __LINE__ );
throw ex;
@@ -293,16 +312,22 @@
ActiveMQConsumer* consumer = new ActiveMQConsumer(
consumerInfo, this );
- // Register the consumer as a listener of messages from the
- // connection.
- connection->addMessageListener(
- consumer->getConsumerInfo()->getConsumerId(), consumer );
+ // Add the consumer to the map.
+ synchronized( &consumers ) {
+ consumers.setValue( consumerInfo->getConsumerId(), consumer );
+ }
+
+ // Register this as a message dispatcher for the consumer.
+ connection->addDispatcher( consumerInfo, this );
// Start the Consumer, we are now ready to receive messages
try{
connection->getConnectionData()->getConnector()->startConsumer(
- consumer->getConsumerInfo() );
+ consumerInfo );
} catch( ActiveMQException& ex ) {
+ synchronized( &consumers ) {
+ consumers.remove( consumerInfo->getConsumerId() );
+ }
delete consumer;
ex.setMark( __FILE__, __LINE__ );
throw ex;
@@ -582,7 +607,7 @@
}
cms::MapMessage* message = connection->getConnectionData()->
- getConnector()->createMapMessage( sessionInfo, transaction );
+ getConnector()->createMapMessage( sessionInfo, transaction );
// Add to Session Closeables and Monitor for close, if needed.
checkConnectorResource(
@@ -653,7 +678,7 @@
"ActiveMQSession::onProducerClose - Session Already Closed" );
}
- if( useAsyncSend == true ) {
+ if( useAsyncSend ) {
// Put it in the send queue, thread will dispatch it. We clone it
// in case the client deletes their copy before we get a chance to
@@ -689,11 +714,10 @@
const ConsumerInfo* consumer =
dynamic_cast<const ConsumerInfo*>( resource );
- if( consumer != NULL ) {
-
- // Remove this Consumer from the Connection
- connection->removeMessageListener(
- consumer->getConsumerId() );
+ if( consumer != NULL )
+ {
+ // Remove the dispatcher for the Connection
+ connection->removeDispatcher( consumer );
// Remove this consumer from the Transaction if we are
// transactional
@@ -701,6 +725,11 @@
transaction->removeFromTransaction(
consumer->getConsumerId() );
}
+
+ // Remove this consumer from the consumers map.
+ synchronized( &consumers ) {
+ consumers.remove( consumer->getConsumerId() );
+ }
}
// Remove the entry from the session resource map if it's there
@@ -884,3 +913,58 @@
// Register as a Listener
resource->addListener( this );
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::dispatch( DispatchData& message ) {
+
+ if( executor != NULL ) {
+ executor->execute( message );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::redispatch( util::Queue<DispatchData>& unconsumedMessages )
+{
+ util::Queue<DispatchData> reversedList;
+
+ // Copy the list in reverse order then clear the original list.
+ synchronized( &unconsumedMessages ) {
+ unconsumedMessages.reverse( reversedList );
+ unconsumedMessages.clear();
+ }
+
+ // Add the list to the front of the executor.
+ while( !reversedList.empty() ) {
+ DispatchData data = reversedList.pop();
+ executor->executeFirst( data );
+ }
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::start()
+{
+ if( executor != NULL ) {
+ executor->start();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::stop()
+{
+ if( executor != NULL ) {
+ executor->stop();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSession::isStarted() const
+{
+ if( executor == NULL ) {
+ return false;
+ }
+
+ return executor->isStarted();
+}
+
+
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Sun Mar 18 03:54:18 2007
@@ -22,9 +22,11 @@
#include <activemq/concurrent/Runnable.h>
#include <activemq/concurrent/Mutex.h>
#include <activemq/connector/SessionInfo.h>
+#include <activemq/core/Dispatcher.h>
#include <activemq/util/Set.h>
#include <activemq/util/Queue.h>
#include <activemq/connector/ConnectorResourceListener.h>
+#include <activemq/util/Map.h>
#include <set>
namespace activemq{
@@ -36,9 +38,11 @@
class ActiveMQMessage;
class ActiveMQProducer;
class ActiveMQConsumer;
+ class ActiveMQSessionExecutor;
class ActiveMQSession :
public cms::Session,
+ public Dispatcher,
public concurrent::Runnable,
public connector::ConnectorResourceListener
{
@@ -70,6 +74,11 @@
* destination.
*/
util::Set<cms::Closeable*> closableSessionResources;
+
+ /**
+ * Map of consumers.
+ */
+ util::Map<long long, ActiveMQConsumer*> consumers;
/**
* Thread to notif a listener if one is added
@@ -80,19 +89,68 @@
* Is this Session using Async Sends.
*/
bool useAsyncSend;
+
+ /**
+ * Indicates whether or not dispatching should be done asynchronously.
+ */
+ bool sessionAsyncDispatch;
/**
* Outgoing Message Queue
*/
util::Queue< std::pair<cms::Message*, ActiveMQProducer*> > msgQueue;
+
+ ActiveMQSessionExecutor* executor;
public:
ActiveMQSession( connector::SessionInfo* sessionInfo,
const util::Properties& properties,
- ActiveMQConnection* connection );
+ ActiveMQConnection* connection,
+ bool sessionAsyncDispatch );
virtual ~ActiveMQSession();
+
+ /**
+ * Indicates whether or not dispatching should be done asynchronously.
+ */
+ bool isSessionAsyncDispatch() const {
+ return sessionAsyncDispatch;
+ }
+
+ util::Map<long long, ActiveMQConsumer*>& getConsumers() {
+ return consumers;
+ }
+
+ /**
+ * Redispatches the given set of unconsumed messages to the consumers.
+ * @param unconsumedMessages - unconsumed messages to be redelivered.
+ */
+ void redispatch( util::Queue<DispatchData>& unconsumedMessages );
+
+ /**
+ * Stops asynchronous message delivery.
+ */
+ void start();
+
+ /**
+ * Starts asynchronous message delivery.
+ */
+ void stop();
+
+ /**
+ * Indicates whether or not the session is currently in the started
+ * state.
+ */
+ bool isStarted() const;
+
+ public: // Methods from ActiveMQMessageDispatcher
+
+ /**
+ * Dispatches a message to a particular consumer.
+ * @param message - the message to be dispatched
+ */
+ virtual void dispatch( DispatchData& message );
public: // Implements Mehtods
@@ -267,21 +325,21 @@
* @return transacted true - false.
*/
virtual bool isTransacted() const;
-
+
/**
- * Unsubscribes a durable subscription that has been created by a
+ * Unsubscribes a durable subscription that has been created by a
* client.
- *
- * This method deletes the state being maintained on behalf of the
- * subscriber by its provider. It is erroneous for a client to delete a
- * durable subscription while there is an active MessageConsumer or
- * Subscriber for the subscription, or while a consumed message is
- * part of a pending transaction or has not been acknowledged in the
+ *
+ * This method deletes the state being maintained on behalf of the
+ * subscriber by its provider. It is erroneous for a client to delete a
+ * durable subscription while there is an active MessageConsumer or
+ * Subscriber for the subscription, or while a consumed message is
+ * part of a pending transaction or has not been acknowledged in the
* session.
* @param name the name used to identify this subscription
* @throws CMSException
*/
- virtual void unsubscribe( const std::string& name )
+ virtual void unsubscribe( const std::string& name )
throw ( cms::CMSException );
public: // ActiveMQSession specific Methods
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?view=auto&rev=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Sun Mar 18 03:54:18 2007
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQSessionExecutor.h"
+#include "ActiveMQSession.h"
+#include "ActiveMQMessage.h"
+#include "ActiveMQConsumer.h"
+#include <activemq/connector/ConsumerInfo.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::concurrent;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) {
+ this->session = session;
+ this->started = false;
+ this->thread = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSessionExecutor::~ActiveMQSessionExecutor() {
+
+ // Stop the thread if it's running.
+ stop();
+
+ // Empty the message queue and destroy any remaining messages.
+ clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::execute( DispatchData& data ) {
+
+ // If dispatch async is off, just dispatch in the context of the transport
+ // thread.
+ if ( !session->isSessionAsyncDispatch() ){
+ dispatch(data);
+ }else {
+
+ synchronized( &messageQueue ) {
+ messageQueue.push(data);
+ wakeup();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
+
+ // If dispatch async is off, just dispatch in the context of the transport
+ // thread.
+ if ( !session->isSessionAsyncDispatch() && started ){
+ dispatch(data);
+ }else {
+
+ synchronized( &messageQueue ) {
+ messageQueue.enqueueFront(data);
+ wakeup();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::start() {
+
+ synchronized( &messageQueue ) {
+ started = true;
+
+ // Don't create the thread unless we need to.
+ if( session->isSessionAsyncDispatch() && thread == NULL ) {
+ thread = new Thread( this );
+ thread->start();
+ }
+
+ wakeup();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::stop() {
+
+ synchronized( &messageQueue ) {
+
+ started = false;
+ wakeup();
+ }
+
+ if( thread != NULL ) {
+ thread->join();
+ delete thread;
+ thread = NULL;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::clear() {
+
+ synchronized( &messageQueue ) {
+
+ while( !messageQueue.empty() ) {
+ DispatchData data = messageQueue.pop();
+ delete data.getMessage();
+ }
+
+ wakeup();
+ }
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::dispatch( DispatchData& data ) {
+
+ ActiveMQConsumer* consumer = NULL;
+ util::Map<long long, ActiveMQConsumer*>& consumers = session->getConsumers();
+
+ synchronized(&consumers) {
+ consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
+ }
+
+ if( consumer != NULL ) {
+ consumer->dispatch( data );
+ }
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::run() {
+
+ try {
+
+ while( started ) {
+
+ // Dispatch all currently available messages.
+ dispatchAll();
+
+ synchronized( &messageQueue ) {
+
+ if( messageQueue.empty() && started ) {
+
+ // Wait for more data or to be woken up.
+ messageQueue.wait();
+ }
+ }
+ }
+
+ } catch( ActiveMQException& ex ) {
+ ex.printStackTrace();
+ } catch( ... ) {
+ ActiveMQException ex(__FILE__, __LINE__, "caught unknown exception" );
+ ex.printStackTrace();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::dispatchAll() {
+
+ // Take out all of the dispatch data currently in the array.
+ std::vector<DispatchData> dataList;
+ synchronized( &messageQueue ) {
+
+ dataList = messageQueue.toArray();
+ messageQueue.clear();
+ }
+
+ // Dispatch all currently available messages.
+ for( unsigned int ix=0; ix<dataList.size(); ++ix ) {
+ DispatchData& data = dataList[ix];
+ dispatch( data );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::wakeup() {
+
+ if( session->isSessionAsyncDispatch() ) {
+ synchronized( &messageQueue ) {
+ messageQueue.notifyAll();
+ }
+ } else if( started ){
+ dispatchAll();
+ }
+}
+
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h?view=auto&rev=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h Sun Mar 18 03:54:18 2007
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_
+#define ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_
+
+#include <activemq/core/Dispatcher.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/util/Queue.h>
+
+namespace activemq{
+namespace core{
+
+ class ActiveMQSession;
+
+ /**
+ * Delegate dispatcher for a single session. Contains a thread
+ * to provide for asynchronous dispatching.
+ */
+ class ActiveMQSessionExecutor :
+ public Dispatcher,
+ public concurrent::Runnable
+ {
+ private:
+
+ ActiveMQSession* session;
+ util::Queue<DispatchData> messageQueue;
+ bool started;
+ concurrent::Thread* thread;
+
+ public:
+
+ /**
+ * Creates an un-started executor for the given session.
+ */
+ ActiveMQSessionExecutor( ActiveMQSession* session );
+
+ /**
+ * Calls stop() then clear().
+ */
+ virtual ~ActiveMQSessionExecutor();
+
+ /**
+ * Executes the dispatch. Adds the given data to the
+ * end of the queue.
+ * @param data - the data to be dispatched.
+ */
+ virtual void execute( DispatchData& data );
+
+ /**
+ * Executes the dispatch. Adds the given data to the
+ * beginning of the queue.
+ * @param data - the data to be dispatched.
+ */
+ virtual void executeFirst( DispatchData& data );
+
+ /**
+ * Starts the dispatching.
+ */
+ virtual void start();
+
+ /**
+ * Stops dispatching.
+ */
+ virtual void stop();
+
+ /**
+ * Indicates if the executor is started
+ */
+ virtual bool isStarted() const {
+ return started;
+ }
+
+ /**
+ * Removes all queued messgaes and destroys them.
+ */
+ virtual void clear();
+
+ /**
+ * Depending on whether or not the session is async,
+ * notifies the thread or simply dispatches all available
+ * messages synchronously.
+ */
+ virtual void wakeup();
+
+ /**
+ * Dispatches a message to a particular consumer.
+ * @param consumer - The consumer to dispatch to.
+ * @param msg - The message to be dispatched.
+ */
+ virtual void dispatch( DispatchData& data );
+
+ /**
+ * Run method - called by the Thread class in the context
+ * of the thread.
+ */
+ virtual void run();
+
+ private:
+
+ /**
+ * Dispatches all messages currently in the queue.
+ */
+ void dispatchAll();
+ };
+
+}}
+
+#endif /*ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp Sun Mar 18 03:54:18 2007
@@ -341,28 +341,30 @@
for( ; itr != messages.end(); ++itr )
{
- ( *itr )->setRedeliveryCount( ( *itr )->getRedeliveryCount() + 1 );
+ ActiveMQMessage* message = *itr;
+ message->setRedeliveryCount( message->getRedeliveryCount() + 1 );
// Redeliver Messages at some point in the future
Thread::sleep( redeliveryDelay );
- if( ( *itr )->getRedeliveryCount() >= maxRedeliveries )
+ if( message->getRedeliveryCount() >= maxRedeliveries )
{
// Poison Ack the Message, we give up processing this one
connection->getConnectionData()->getConnector()->
acknowledge(
session->getSessionInfo(),
consumer->getConsumerInfo(),
- dynamic_cast< Message* >( *itr ),
+ dynamic_cast< Message* >( message ),
Connector::PoisonAck );
// Won't redeliver this so we kill it here.
- delete *itr;
+ delete message;
return;
}
- consumer->onActiveMQMessage( *itr );
+ DispatchData data( consumer->getConsumerInfo(), message );
+ consumer->dispatch( data );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h Sun Mar 18 03:54:18 2007
@@ -78,7 +78,7 @@
// Transaction Info for the current Transaction
connector::TransactionInfo* transactionInfo;
- // Map of ActiveMQMessageListener to Messages to Rollback
+ // Map of ActiveMQMessageConsumer to Messages to Rollback
RollbackMap rollbackMap;
// Lock object to protect the rollback Map
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h?view=auto&rev=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h Sun Mar 18 03:54:18 2007
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CORE_DISPATCHDATA_H_
+#define ACTIVEMQ_CORE_DISPATCHDATA_H_
+
+#include <stdlib.h>
+
+namespace activemq {
+
+ namespace connector {
+ class ConsumerInfo;
+ }
+
+namespace core {
+
+ class ActiveMQMessage;
+
+ /**
+ * Contains information about dispatching to a particular consumer.
+ */
+ class DispatchData {
+ private:
+
+ connector::ConsumerInfo* consumer;
+ ActiveMQMessage* message;
+
+ public:
+
+ DispatchData(){
+ consumer = NULL;
+ message = NULL;
+ }
+
+ DispatchData( connector::ConsumerInfo* consumer, ActiveMQMessage* message ) {
+ this->consumer = consumer;
+ this->message = message;
+ }
+
+ DispatchData( const DispatchData& d ) {
+ (*this) = d;
+ }
+
+ DispatchData& operator =( const DispatchData& d ) {
+ this->consumer = d.consumer;
+ this->message = d.message;
+ return *this;
+ }
+
+ connector::ConsumerInfo* getConsumer() {
+ return consumer;
+ }
+
+ ActiveMQMessage* getMessage() {
+ return message;
+ }
+
+ };
+}}
+
+#endif /*ACTIVEMQ_CORE_DISPATCHDATA_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h?view=auto&rev=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h Sun Mar 18 03:54:18 2007
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CORE_DISPATCHER_H_
+#define ACTIVEMQ_CORE_DISPATCHER_H_
+
+#include <activemq/core/DispatchData.h>
+
+namespace activemq{
+namespace core{
+
+ /**
+ * Interface for an object responsible for dispatching messages to
+ * consumers.
+ */
+ class Dispatcher {
+ public:
+
+ virtual ~Dispatcher(){}
+
+ /**
+ * Dispatches a message to a particular consumer.
+ * @param message - the message to be dispatched.
+ */
+ virtual void dispatch( DispatchData& message ) = 0;
+ };
+
+}}
+
+#endif /*ACTIVEMQ_CORE_DISPATCHER_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h Sun Mar 18 03:54:18 2007
@@ -17,7 +17,8 @@
#ifndef ACTIVEMQ_UTIL_QUEUE_H
#define ACTIVEMQ_UTIL_QUEUE_H
-#include <queue>
+#include <list>
+#include <vector>
#include <activemq/concurrent/Mutex.h>
#include <activemq/exceptions/ActiveMQException.h>
@@ -62,6 +63,11 @@
virtual ~Queue(void);
/**
+ * Empties this queue.
+ */
+ void clear();
+
+ /**
* Returns a Reference to the element at the head of the queue
* @return reference to a queue type object or (safe)
*/
@@ -90,6 +96,12 @@
* @param t - Queue Object Type reference.
*/
void push( const T &t );
+
+ /**
+ * Places a new Object at the front of the queue
+ * @param t - Queue Object Type reference.
+ */
+ void enqueueFront( const T &t );
/**
* Removes and returns the element that is at the Head of the queue
@@ -108,6 +120,19 @@
* @return boolean indicating queue emptiness
*/
bool empty(void) const;
+
+ /**
+ * @return the all values in this queue as a std::vector.
+ */
+ virtual std::vector<T> toArray() const;
+
+ /**
+ * Reverses the order of the contents of this queue and stores them
+ * in the target queue.
+ * @param target - The target queue that will receive the contents of
+ * this queue in reverse order.
+ */
+ void reverse( Queue<T>& target ) const;
/**
* Locks the object.
@@ -176,7 +201,7 @@
private:
// The real queue
- std::queue<T> queue;
+ std::list<T> queue;
// Object used for sync
concurrent::Mutex mutex;
@@ -190,7 +215,7 @@
//-----{ Static Init }----------------------------------------------------//
template <typename T>
T Queue<T>::safe;
-
+
//-----{ Retrieve current length of Queue }-------------------------------//
template <typename T> inline size_t Queue<T>::size() const
@@ -218,12 +243,24 @@
{
}
+ template <typename T>
+ void Queue<T>::clear()
+ {
+ queue.clear();
+ }
+
//-----{ Add Elements to Back of Queue }----------------------------------//
template <typename T>
void Queue<T>::push( const T &t )
{
- queue.push( t );
+ queue.push_back( t );
+ }
+
+ template <typename T>
+ void Queue<T>::enqueueFront( const T &t )
+ {
+ queue.push_front( t );
}
//-----{ Remove Elements from Front of Queue }----------------------------//
@@ -239,7 +276,7 @@
// Pop the element into a temp, since we need to remain locked.
// this means getting front and then popping.
T temp = queue.front();
- queue.pop();
+ queue.pop_front();
return temp;
}
@@ -294,6 +331,30 @@
}
return queue.back();
+ }
+
+ template <typename T>
+ void Queue<T>::reverse( Queue<T>& target ) const
+ {
+ typename std::list<T>::const_reverse_iterator iter;
+ iter = queue.rbegin();
+ for( ; iter != queue.rend(); ++iter ) {
+ target.push( *iter );
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ template <typename T>
+ std::vector<T> Queue<T>::toArray() const{
+ std::vector<T> valueArray(queue.size());
+
+ typename std::list<T>::const_iterator iter;
+ iter=queue.begin();
+ for( int ix=0; iter != queue.end(); ++iter, ++ix ){
+ valueArray[ix] = *iter;
+ }
+
+ return valueArray;
}
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h Sun Mar 18 03:54:18 2007
@@ -43,13 +43,13 @@
* @throws CMSException
*/
virtual void close() throw( CMSException ) = 0;
-
- /**
- * Creates a new Session to work for this Connection
+
+ /**
+ * Creates an AUTO_ACKNOWLEDGE Session.
* @throws CMSException
*/
virtual Session* createSession() throw ( CMSException ) = 0;
-
+
/**
* Creates a new Session to work for this Connection using the
* specified acknowledgment mode
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp Sun Mar 18 03:54:18 2007
@@ -102,7 +102,7 @@
// Create CMS Object for Comms
cms::Session* session = testSupport.getSession();
- cms::Destination* requestTopic = session->createTopic( "myRequestTopic" );
+ cms::Destination* requestTopic = session->createTopic( Guid::createGUIDString() );
cms::Destination* responseTopic = session->createTemporaryTopic();
Consumer* requestConsumer = new Consumer( testSupport.getConnection(),
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h Sun Mar 18 03:54:18 2007
@@ -139,6 +139,8 @@
delete info2;
delete info3;
delete info4;
+
+ delete connector;
}
void testConsumers()
@@ -194,6 +196,8 @@
delete cinfo2;
delete cinfo3;
delete cinfo4;
+
+ delete connector;
}
void testCommand()
@@ -260,6 +264,8 @@
delete cinfo2;
delete cinfo3;
delete cinfo4;
+
+ delete connector;
}
void testSendingCommands(){
@@ -304,6 +310,8 @@
delete cinfo1;
delete cinfo2;
+
+ delete connector;
}
void testSubscribeOptions(){
@@ -427,6 +435,8 @@
// Done
delete session;
+
+ delete connector;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h Sun Mar 18 03:54:18 2007
@@ -110,19 +110,19 @@
}
};
- class MyActiveMQMessageListener : public ActiveMQMessageListener
+ class MyDispatcher : public Dispatcher
{
public:
std::vector<ActiveMQMessage*> messages;
public:
- virtual ~MyActiveMQMessageListener(){}
+ virtual ~MyDispatcher(){}
- virtual void onActiveMQMessage( ActiveMQMessage* message )
+ virtual void dispatch( DispatchData& data )
throw ( exceptions::ActiveMQException )
{
- messages.push_back( message );
+ messages.push_back( data.getMessage() );
}
};
@@ -136,7 +136,7 @@
MyMessageListener listener;
MyExceptionListener exListener;
MyCommandListener cmdListener;
- MyActiveMQMessageListener msgListener;
+ MyDispatcher msgListener;
std::string connectionId = "testConnectionId";
util::SimpleProperties* properties =
new util::SimpleProperties();
@@ -198,7 +198,7 @@
consumer.setSessionInfo( &session );
consumer.setDestination( &myTopic );
- connection.addMessageListener( 1, &msgListener );
+ connection.addDispatcher( &consumer, &msgListener );
connector::stomp::commands::TextMessageCommand* cmd =
new connector::stomp::commands::TextMessageCommand;
@@ -217,14 +217,14 @@
CPPUNIT_ASSERT( msgListener.messages.size() == 1 );
- connection.removeMessageListener( 1 );
+ connection.removeDispatcher( &consumer );
msgListener.messages.clear();
consumerListener->onConsumerMessage( &consumer, cmd );
CPPUNIT_ASSERT( msgListener.messages.size() == 0 );
- connection.addMessageListener( 1, &msgListener );
+ connection.addDispatcher( &consumer, &msgListener );
connection.stop();
consumerListener->onConsumerMessage( &consumer, cmd );
@@ -239,7 +239,7 @@
consumerListener->onConsumerMessage( &consumer, cmd );
CPPUNIT_ASSERT( msgListener.messages.size() == 1 );
- connection.removeMessageListener( 1 );
+ connection.removeDispatcher( &consumer );
msgListener.messages.clear();
session1->close();
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h Sun Mar 18 03:54:18 2007
@@ -368,7 +368,7 @@
}
}
- CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
+ CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
msgListener1.messages[0]->acknowledge();
@@ -382,7 +382,7 @@
}
}
- CPPUNIT_ASSERT( msgListener2.messages.size() == 1 );
+ CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener2.messages.size() );
msgListener2.messages[0]->acknowledge();
@@ -459,7 +459,7 @@
}
}
- CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
+ CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
session->commit();