You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/18 11:54:19 UTC

svn commit: r519590 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ main/activemq/util/ main/cms/ test-integration/integration/connector/openwire/ test/activemq/connector/stomp/ test/activemq/core/

Author: nmittler
Date: Sun Mar 18 03:54:18 2007
New Revision: 519590

URL: http://svn.apache.org/viewvc?view=rev&rev=519590
Log:
AMQCPP-90 - moving the async thread to session instead of consumers

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
Removed:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageListener.h
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Sun Mar 18 03:54:18 2007
@@ -23,6 +23,7 @@
     activemq/core/ActiveMQTransaction.cpp \
     activemq/core/ActiveMQProducer.cpp \
     activemq/core/ActiveMQConnectionFactory.cpp \
+    activemq/core/ActiveMQSessionExecutor.cpp \
     activemq/logger/LoggerHierarchy.cpp \
     activemq/logger/LogWriter.cpp \
     activemq/logger/Logger.cpp \
@@ -96,12 +97,14 @@
     activemq/core/ActiveMQDestination.h \
     activemq/core/ActiveMQConnection.h \
     activemq/core/ActiveMQTransaction.h \
-    activemq/core/ActiveMQMessageListener.h \
     activemq/core/ActiveMQConnectionFactory.h \
     activemq/core/ActiveMQConsumer.h \
     activemq/core/ActiveMQSession.h \
     activemq/core/ActiveMQAckHandler.h \
     activemq/core/ActiveMQConstants.h \
+    activemq/core/ActiveMQSessionExecutor.h \
+    activemq/core/DispatchData.h \
+    activemq/core/Dispatcher.h \
     activemq/logger/LoggerCommon.h \
     activemq/logger/LogRecord.h \
     activemq/logger/SimpleLogger.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Sun Mar 18 03:54:18 2007
@@ -21,10 +21,12 @@
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQConsumer.h>
 #include <activemq/exceptions/NullPointerException.h>
+#include <activemq/util/Boolean.h>
 
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::util;
 using namespace activemq::connector;
 using namespace activemq::exceptions;
 using namespace std;
@@ -36,6 +38,9 @@
     this->started = false;
     this->closed = false;
     this->exceptionListener = NULL;
+    
+    alwaysSessionAsync = Boolean::parseBoolean(
+        connectionData->getProperties().getProperty( "alwaysSessionAsync", "true" ) );
 
     // Register for messages and exceptions from the connector.
     Connector* connector = connectionData->getConnector();
@@ -55,34 +60,63 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Session* ActiveMQConnection::createSession()
-    throw ( cms::CMSException )
+void ActiveMQConnection::addDispatcher( connector::ConsumerInfo* consumer, 
+    Dispatcher* dispatcher )
+{
+    // Add the consumer to the map.
+    synchronized( &dispatchers )
+    {
+        dispatchers.setValue( consumer->getConsumerId(), dispatcher );
+    }
+}
+        
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeDispatcher( const connector::ConsumerInfo* consumer ) {
+    
+    // Remove the consumer from the map.
+    synchronized( &dispatchers )
+    {
+        dispatchers.remove( consumer->getConsumerId() );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session* ActiveMQConnection::createSession() throw ( cms::CMSException )
 {
     try
     {
-        return this->createSession( Session::AUTO_ACKNOWLEDGE );
+        return createSession( Session::AUTO_ACKNOWLEDGE );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Session* ActiveMQConnection::createSession(
-   cms::Session::AcknowledgeMode ackMode )
-      throw ( cms::CMSException )
+cms::Session* ActiveMQConnection::createSession( 
+    cms::Session::AcknowledgeMode ackMode ) throw ( cms::CMSException )
 {
     try
     {
+        // Determine whether or not to make dispatch for this session asynchronous        
+        bool doSessionAsync = alwaysSessionAsync || !activeSessions.isEmpty() || 
+            ackMode==Session::SESSION_TRANSACTED || ackMode==Session::CLIENT_ACKNOWLEDGE;
+                        
         // Create the session instance.
         ActiveMQSession* session = new ActiveMQSession(
             connectionData->getConnector()->createSession( ackMode ),
             connectionData->getProperties(),
-            this );
+            this,
+            doSessionAsync );
 
         // Add the session to the set of active sessions.
         synchronized( &activeSessions ) {
             activeSessions.add( session );
         }
+        
+        // If we're already started, start the session.
+        if( started ) {
+            session->start();
+        }
 
         return session;
     }
@@ -107,7 +141,7 @@
         }
 
         // Get the complete list of active sessions.
-        std::vector<cms::Session*> allSessions;
+        std::vector<ActiveMQSession*> allSessions;
         synchronized( &activeSessions ) {
             allSessions = activeSessions.toArray();
         }
@@ -145,6 +179,12 @@
     // messages delivered while this connection is stopped are dropped
     // and not acknowledged.
     started = true;
+    
+    // Start all the sessions.
+    std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
+    for( unsigned int ix=0; ix<sessions.size(); ++ix ) {
+        sessions[ix]->start();
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -153,26 +193,11 @@
     // Once current deliveries are done this stops the delivery of any
     // new messages.
     started = false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::addMessageListener( long long consumerId,
-                                             ActiveMQMessageListener* listener )
-{
-    // Place in Map
-    synchronized( &consumers )
-    {
-        consumers.setValue( consumerId, listener );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeMessageListener( long long consumerId )
-{
-    // Remove from Map
-    synchronized( &consumers )
-    {
-        consumers.remove( consumerId );
+    
+    // Start all the sessions.
+    std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
+    for( unsigned int ix=0; ix<sessions.size(); ++ix ) {
+        sessions[ix]->stop();
     }
 }
 
@@ -211,16 +236,17 @@
             return;
         }
 
-        // Started, so lock map and dispatch the message.
-        synchronized( &consumers )
+        // Look up the dispatcher.
+        Dispatcher* dispatcher = NULL;
+        synchronized( &dispatchers )
         {
-            if( consumers.containsKey(consumer->getConsumerId()) )
-            {
-                ActiveMQMessageListener* listener =
-                    consumers.getValue(consumer->getConsumerId());
-
-                listener->onActiveMQMessage( message );
-            }
+            dispatcher = dispatchers.getValue(consumer->getConsumerId());
+        }
+        
+        // Dispatch the message.
+        if( dispatcher != NULL ) {
+            DispatchData data( consumer, message );
+            dispatcher->dispatch( data );
         }
     }
     catch( exceptions::ActiveMQException& ex )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Sun Mar 18 03:54:18 2007
@@ -21,8 +21,8 @@
 #include <cms/Connection.h>
 #include <cms/ExceptionListener.h>
 #include <activemq/core/ActiveMQConnectionData.h>
-#include <activemq/core/ActiveMQMessageListener.h>
 #include <activemq/core/ActiveMQMessage.h>
+#include <activemq/core/Dispatcher.h>
 #include <activemq/connector/ConsumerMessageListener.h>
 #include <activemq/util/Properties.h>
 #include <activemq/util/Map.h>
@@ -31,6 +31,11 @@
 #include <string>
 
 namespace activemq{
+    
+    namespace connector {
+        class ConsumerInfo;
+    }
+    
 namespace core{
 
     class cms::Session;
@@ -70,14 +75,20 @@
         bool closed;
 
         /**
-         * Map of Consumer Ids to ActiveMQMessageListeners
+         * Map of message dispatchers indexed by consumer id.
          */
-        util::Map< long long, ActiveMQMessageListener* > consumers;
+        util::Map< long long, Dispatcher* > dispatchers;
 
         /**
          * Maintain the set of all active sessions.
          */
-        util::Set<cms::Session*> activeSessions;
+        util::Set<ActiveMQSession*> activeSessions;
+        
+        /**
+         * If true, dispatch for all sessions will be asynchronous to the 
+         * transport.
+         */
+        bool alwaysSessionAsync;
 
     public:
 
@@ -95,6 +106,19 @@
          * @param session The session to be unregistered from this connection.
          */
         virtual void removeSession( ActiveMQSession* session ) throw ( cms::CMSException );
+        
+        /**
+         * Adds a dispatcher for a consumer.
+         * @param consumer - The consumer for which to register a dispatcher.
+         * @param dispatcher - The dispatcher to handle incoming messages for the consumer.
+         */
+        virtual void addDispatcher( connector::ConsumerInfo* consumer, Dispatcher* dispatcher );
+        
+        /**
+         * Removes the dispatcher for a consumer.
+         * @param consumer - The consumer for which to remove the dispatcher.
+         */
+        virtual void removeDispatcher( const connector::ConsumerInfo* consumer );
 
     public:   // Connection Interface Methods
 
@@ -161,24 +185,6 @@
          * @throws CMSException
          */
         virtual void stop() throw ( cms::CMSException );
-
-    public:   // ActiveMQConnection Methods
-
-        /**
-         * Adds the ActiveMQMessageListener to the Mapping of Consumer Id's
-         * to listeners, all message to that id will be routed to the given
-         * listener
-         * @param consumerId Consumer Id String
-         * @param listener ActiveMQMessageListener Pointer
-         */
-        virtual void addMessageListener( long long consumerId,
-                                         ActiveMQMessageListener* listener );
-
-        /**
-         * Remove the Listener for the specified Consumer Id
-         * @param consumerId Consumer Id string
-         */
-        virtual void removeMessageListener( long long consumerId );
 
     public:     // ExceptionListener interface methods
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Sun Mar 18 03:54:18 2007
@@ -41,11 +41,10 @@
             __FILE__, __LINE__,
             "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session");
     }
-
+    
     // Init Producer Data
     this->session        = session;
     this->consumerInfo   = consumerInfo;
-    this->listenerThread = NULL;
     this->listener       = NULL;
     this->closed         = false;
 
@@ -67,62 +66,51 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::close()
+void ActiveMQConsumer::close() 
     throw ( cms::CMSException )
 {
     try
     {
         if( !closed ) {
-
+            
             // Identifies any errors encountered during shutdown.
             bool haveException = false;
-            ActiveMQException error;
-
+            ActiveMQException error; 
+            
             // Close the ConsumerInfo
             if( !consumerInfo->isClosed() ) {
                 try{
-                    // We don't want a callback now
-                    this->consumerInfo->removeListener( this );
-                    this->consumerInfo->close();
-                } catch ( ActiveMQException& ex ){
-                    if( !haveException ){
-                        ex.setMark( __FILE__, __LINE__ );
+                        // We don't want a callback now
+                        this->consumerInfo->removeListener( this );
+                        this->consumerInfo->close();
+                } catch( ActiveMQException& ex ){
+                    if( !haveException ){ 
+                        ex.setMark( __FILE__, __LINE__ );                
                         error = ex;
                         haveException = true;
                     }
                 }
             }
-
+            
             closed = true;
-
-            // Stop the asynchronous message processin thread if it's
-            // running.
-            try{
-                stopThread();
-            } catch ( ActiveMQException& ex ){
-                if( !haveException ){
-                    ex.setMark( __FILE__, __LINE__ );
-                    error = ex;
-                    haveException = true;
-                }
-            }
-
+            
             // Purge all the pending messages
             try{
                 purgeMessages();
             } catch ( ActiveMQException& ex ){
-                if( !haveException ){
-                    ex.setMark( __FILE__, __LINE__ );
+                if( !haveException ){ 
+                    ex.setMark( __FILE__, __LINE__ );                
                     error = ex;
                     haveException = true;
                 }
             }
-
+            
             // If we encountered an error, propagate it.
             if( haveException ){
                 error.setMark( __FILE__, __LINE__ );
                 throw error;
             }
+                                  
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -130,7 +118,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-std::string ActiveMQConsumer::getMessageSelector() const
+std::string ActiveMQConsumer::getMessageSelector() const 
     throw ( cms::CMSException )
 {
     try
@@ -154,15 +142,15 @@
                 "ActiveMQConsumer::receive - This Consumer is closed" );
         }
 
-        synchronized( &msgQueue )
+        synchronized( &unconsumedMessages )
         {
             // Check for empty in case of spurious wakeup, or race to
             // queue lock.
-            while( !closed && msgQueue.empty() )
+            while( !closed && unconsumedMessages.empty() )
             {
-                msgQueue.wait();
+                unconsumedMessages.wait();
             }
-
+            
             // This will only happen when this object is being
             // closed in another thread context - kind of
             // scary.
@@ -170,15 +158,16 @@
                 throw ActiveMQException( __FILE__, __LINE__,
                     "Consumer is being closed in another thread" );
             }
-
+            
             // Fetch the Message then copy it so it can be handed off
             // to the user.
-            cms::Message* message = msgQueue.pop();
+            DispatchData data = unconsumedMessages.pop();
+            cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
             cms::Message* result = message->clone();
 
             // The Message is cleaned up here if the Session is not
             // transacted, otherwise we let the transaction clean up
-            // this message as it will have already been ack'd and
+            // this message as it will have already been ack'd and 
             // stored for later redelivery.
             destroyMessage( message );
 
@@ -192,7 +181,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumer::receive( int millisecs )
+cms::Message* ActiveMQConsumer::receive( int millisecs ) 
     throw ( cms::CMSException )
 {
     try
@@ -204,19 +193,19 @@
                 "ActiveMQConsumer::receive - This Consumer is closed" );
         }
 
-        synchronized( &msgQueue )
+        synchronized( &unconsumedMessages )
         {
             // Check for empty, and wait if its not
-            if( !closed && msgQueue.empty() ){
-
-                msgQueue.wait(millisecs);
+            if( !closed && unconsumedMessages.empty() ){
+                
+                unconsumedMessages.wait(millisecs);
 
                 // if its still empty...bail
-                if( msgQueue.empty() ) {
+                if( unconsumedMessages.empty() ) {
                     return NULL;
                 }
             }
-
+            
             // This will only happen when this object is being
             // closed in another thread context - kind of
             // scary.
@@ -224,15 +213,16 @@
                 throw ActiveMQException( __FILE__, __LINE__,
                     "Consumer is being closed in another thread" );
             }
-
+            
             // Fetch the Message then copy it so it can be handed off
             // to the user.
-            cms::Message* message = msgQueue.pop();
+            DispatchData data = unconsumedMessages.pop();
+            cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
             cms::Message* result = message->clone();
 
             // The Message is cleaned up here if the Session is not
             // transacted, otherwise we let the transaction clean up
-            // this message as it will have already been ack'd and
+            // this message as it will have already been ack'd and 
             // stored for later redelivery.
             destroyMessage( message );
 
@@ -246,7 +236,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumer::receiveNoWait()
+cms::Message* ActiveMQConsumer::receiveNoWait() 
     throw ( cms::CMSException )
 {
     try
@@ -258,25 +248,26 @@
                 "ActiveMQConsumer::receive - This Consumer is closed" );
         }
 
-        synchronized( &msgQueue )
+        synchronized( &unconsumedMessages )
         {
-            if( !msgQueue.empty() )
+            if( !unconsumedMessages.empty() )
             {
                 // Fetch the Message then copy it so it can be handed off
                 // to the user.
-                cms::Message* message = msgQueue.pop();
+                DispatchData data = unconsumedMessages.pop();
+                cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
                 cms::Message* result = message->clone();
 
                 // The Message is cleaned up here if the Session is not
                 // transacted, otherwise we let the transaction clean up
-                // this message as it will have already been ack'd and
+                // this message as it will have already been ack'd and 
                 // stored for later redelivery.
                 destroyMessage( message );
 
                 return result;
             }
         }
-
+        
         return NULL;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -295,16 +286,24 @@
                 "ActiveMQConsumer::receive - This Consumer is closed" );
         }
 
-        synchronized( &listenerLock )
-        {
-            this->listener = listener;
-        }
-
-        // Start the thread if it isn't already running.
-        // If it is already running, this method will wake the thread up
-        // to notify it that there is a message listener, so that it may
-        // get rid of backed up messages.
-        startThread();
+        this->listener = listener;
+        
+        if( listener != NULL && session != NULL ) {
+            
+            // Now that we have a valid message listener,
+            // redispatch all the messages that it missed.
+            
+            bool wasStarted = session->isStarted();
+            if( wasStarted ) {
+                session->stop();
+            }
+                        
+            session->redispatch( unconsumedMessages );
+            
+            if( wasStarted ) {
+                session->start();
+            }
+        }       
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -333,70 +332,13 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::run()
-{
-    try
-    {
-        while( !closed )
-        {
-            Message* message = NULL;
-
-            synchronized( &msgQueue )
-            {
-
-                // Gaurd against spurious wakeup or race to sync lock
-                // also if the listner has been unregistered we don't
-                // have anyone to notify, so we wait till a new one is
-                // registered, and then we will deliver the backlog
-                while( msgQueue.empty() || listener == NULL )
-                {
-                    if( closed )
-                    {
-                        break;
-                    }
-                    msgQueue.wait();
-                }
-
-                // don't want to process messages if we are shutting down.
-                if( closed )
-                {
-                    return;
-                }
-
-                // Dispatch the message
-                message = msgQueue.pop();
-            }
-
-            // Notify the listener
-            notifyListener( message );
-
-            // The Message is cleaned up here if the Session is not
-            // transacted, otherwise we let the transaction clean up
-            // this message as it will have already been ack'd and
-            // stored for later redelivery.
-            destroyMessage( message );
-        }
-    }
-    catch( ... )
-    {
-        cms::ExceptionListener* listener = session->getExceptionListener();
-
-        if( listener != NULL )
-        {
-            listener->onException( ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::run - "
-                "MessageListener threw an unknown Exception, recovering..." ) );
-        }
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::dispatch( ActiveMQMessage* message )
-    throw ( cms::CMSException )
+void ActiveMQConsumer::dispatch( DispatchData& data )
 {
     try
     {
+        
+        ActiveMQMessage* message = data.getMessage();
+        
         // Don't dispatch expired messages, ack it and then destroy it
         if( message->isExpired() ) {
             session->acknowledge( this, message );
@@ -405,25 +347,32 @@
             // stop now, don't queue
             return;
         }
-
-        // If the Session is in ClientAcknowledge mode, then we set the
+        
+        // If the Session is in ClientAcknowledge mode, then we set the 
         // handler in the message to this object and send it out.  Otherwise
         // we ack it here for all the other Modes.
         if( session->getAcknowledgeMode() == Session::CLIENT_ACKNOWLEDGE ) {
-
+            
             // Register ourself so that we can handle the Message's
             // acknowledge method.
             message->setAckHandler( this );
-
+            
         } else {
             session->acknowledge( this, message );
         }
-
-        // No listener, so we queue it
-        synchronized( &msgQueue ) {
-
-            msgQueue.push( dynamic_cast< cms::Message* >( message ) );
-            msgQueue.notifyAll();
+        
+        // If we have a listener, send the message.
+        if( listener != NULL ) {
+            cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
+            listener->onMessage( message );
+            destroyMessage( message );
+        } else {
+            
+            // No listener, add it to the unconsumed messages list
+            synchronized( &unconsumedMessages ) {
+                unconsumedMessages.push( data );
+                unconsumedMessages.notifyAll();
+            }
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -435,14 +384,14 @@
 {
     try
     {
-        synchronized( &msgQueue )
+        synchronized( &unconsumedMessages )
         {
-            while( !msgQueue.empty() )
+            while( !unconsumedMessages.empty() )
             {
                 // destroy these messages if this is not a transacted
-                // session, if it is then the tranasction will clean
+                // session, if it is then the tranasction will clean 
                 // the messages up.
-                destroyMessage( msgQueue.pop() );
+                destroyMessage( dynamic_cast<cms::Message*>(unconsumedMessages.pop().getMessage()) );
             }
         }
     }
@@ -451,46 +400,8 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::onActiveMQMessage( ActiveMQMessage* message )
-    throw ( ActiveMQException )
-{
-    try
-    {
-        if( message == NULL )
-        {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::onActiveMQMessage - Passed a Null Message");
-        }
-
-        this->dispatch( message );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::notifyListener( Message* message ) throw ( ActiveMQException ){
-
-    try
-    {
-        MessageListener* listener = NULL;
-        synchronized( &listenerLock )
-        {
-            listener = getMessageListener();
-        }
-        if(listener != NULL)
-        {
-            listener->onMessage( message );
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::destroyMessage( Message* message ) throw ( ActiveMQException ){
-
+    
     try
     {
         /**
@@ -501,54 +412,7 @@
         {
             delete message;
         }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::startThread() throw ( ActiveMQException ) {
-
-    try
-    {
-        // Start the thread, if it's not already started.
-        if( listenerThread == NULL )
-        {
-            listenerThread = new Thread( this );
-            listenerThread->start();
-        }
-
-        // notify the Queue so that any pending messages get delivered
-        synchronized( &msgQueue )
-        {
-            msgQueue.notifyAll();
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::stopThread() throw ( ActiveMQException ) {
-
-    try
-    {
-        // if the thread is running signal it to quit and then
-        // wait for run to return so thread can die
-        if( listenerThread != NULL )
-        {
-            synchronized( &msgQueue )
-            {
-                // Force a wakeup if run is in a wait.
-                msgQueue.notifyAll();
-            }
-
-            // Wait for it to die and then delete it.
-            listenerThread->join();
-            delete listenerThread;
-            listenerThread = NULL;
-        }
-    }
+    }        
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
@@ -580,3 +444,4 @@
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Sun Mar 18 03:54:18 2007
@@ -26,8 +26,7 @@
 #include <activemq/connector/ConnectorResourceListener.h>
 #include <activemq/util/Queue.h>
 #include <activemq/core/ActiveMQAckHandler.h>
-#include <activemq/core/ActiveMQMessageListener.h>
-#include <activemq/concurrent/Runnable.h>
+#include <activemq/core/Dispatcher.h>
 #include <activemq/concurrent/Mutex.h>
 
 namespace activemq{
@@ -35,36 +34,29 @@
 
     class ActiveMQSession;
 
-    class ActiveMQConsumer :
+    class ActiveMQConsumer : 
         public cms::MessageConsumer,
         public ActiveMQAckHandler,
-        public concurrent::Runnable,
-        public ActiveMQMessageListener,
-        public connector::ConnectorResourceListener
+        public Dispatcher,
+		public connector::ConnectorResourceListener
     {
     private:
-
+    
         // The session that owns this Consumer
         ActiveMQSession* session;
-
+        
         // The Consumer info for this Consumer
         connector::ConsumerInfo* consumerInfo;
-
+        
         // The Message Listener for this Consumer
         cms::MessageListener* listener;
-
-        // Lock to protect us from dispatching to a dead listener
-        concurrent::Mutex listenerLock;
-
-        // Message Queue
-        util::Queue<cms::Message*> msgQueue;
-
-        // Thread to notif a listener if one is added
-        concurrent::Thread* listenerThread;
-
+        
+        // Queue of unconsumed messages.
+        util::Queue<DispatchData> unconsumedMessages;
+        
         // Boolean that indicates if the consumer has been closed
         bool closed;
-
+        
     public:
 
         /**
@@ -85,7 +77,7 @@
          * @throws CMSException
          */
         virtual void close() throw ( cms::CMSException );
-
+         
         /**
          * Synchronously Receive a Message
          * @return new message
@@ -129,9 +121,9 @@
          * @return This Consumer's selector expression or "".
          * @throws cms::CMSException
          */
-        virtual std::string getMessageSelector() const
+        virtual std::string getMessageSelector() const 
             throw ( cms::CMSException );
-
+          
         /**
          * Method called to acknowledge the message passed
          * @param message the Message to Acknowlegde
@@ -140,31 +132,16 @@
         virtual void acknowledgeMessage( const ActiveMQMessage* message )
             throw ( cms::CMSException );
 
+    public:  // Dispatcher Methods
+    
         /**
-         * Run method that is called from the Thread class when this object
-         * is registered with a Thread and started.  This function reads from
-         * the message queue and dispatches calls to the MessageConsumer that
-         * is registered with this class.
-         *
-         * It is a error for a MessageListener to throw an exception in their
-         * onMessage method, but if it does happen this function will get any
-         * registered exception listener from the session and notify it.
-         */
-        virtual void run();
-
-    public:  // ActiveMQMessageListener Methods
-
-        /**
-         * Called asynchronously when a new message is received, the message
-         * that is passed is now the property of the callee, and the caller
-         * will disavowe all knowledge of the message, i.e Callee must delete.
+         * Called asynchronously by the session to dispatch a message.
          * @param message object pointer
          */
-        virtual void onActiveMQMessage( ActiveMQMessage* message )
-            throw ( exceptions::ActiveMQException );
-
+        virtual void dispatch( DispatchData& message );
+    
     public:  // ActiveMQSessionResource
-
+    
         /**
          * Retrieve the Connector resource that is associated with
          * this Session resource.
@@ -177,19 +154,8 @@
     public:  // ActiveMQConsumer Methods
 
         /**
-         * Called to dispatch a message to this consumer, this is usually
-         * called from the context of another thread.  This will enqueue a
-         * message on the Consumers Queue, or notify a listener if one is
-         * currently registered.
-         * @param message cms::Message pointer to the message to dispatch
-         * @throws cms::CMSException
-         */
-        virtual void dispatch( ActiveMQMessage* message )
-            throw ( cms::CMSException );
-
-        /**
          * Get the Consumer information for this consumer
-         * @return Pointer to a Consumer Info Object
+         * @return Pointer to a Consumer Info Object            
          */
         virtual connector::ConsumerInfo* getConsumerInfo() {
             return consumerInfo;
@@ -207,40 +173,20 @@
             const connector::ConnectorResource* resource ) throw ( cms::CMSException );
 
     protected:
-
+            
         /**
          * Purges all messages currently in the queue.  This can be as a
          * result of a rollback, or of the consumer being shutdown.
          */
         virtual void purgeMessages() throw (exceptions::ActiveMQException);
-
+        
         /**
          * Destroys the message if the session is transacted, otherwise
          * does nothing.
          * @param message the message to destroy
          */
-        virtual void destroyMessage( cms::Message* message )
-            throw (exceptions::ActiveMQException);
-
-        /**
-         * Notifies the listener of a message.
-         * @param message the message to pass to the listener
-         */
-        void notifyListener( cms::Message* message )
+        virtual void destroyMessage( cms::Message* message ) 
             throw (exceptions::ActiveMQException);
-
-        /**
-         * Starts the message processing thread to receive messages
-         * asynchronously.  This thread is started when setMessageListener
-         * is invoked, which means that the caller is choosing to use this
-         * consumer asynchronously instead of synchronously (receive).
-         */
-        void startThread() throw (exceptions::ActiveMQException);
-
-        /**
-         * Stops the asynchronous message processing thread if it's started.
-         */
-        void stopThread() throw (exceptions::ActiveMQException);
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sun Mar 18 03:54:18 2007
@@ -24,6 +24,7 @@
 #include <activemq/core/ActiveMQConsumer.h>
 #include <activemq/core/ActiveMQMessage.h>
 #include <activemq/core/ActiveMQProducer.h>
+#include <activemq/core/ActiveMQSessionExecutor.h>
 #include <activemq/util/Boolean.h>
 
 #include <activemq/connector/TransactionInfo.h>
@@ -40,7 +41,8 @@
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
                                   const Properties& properties,
-                                  ActiveMQConnection* connection)
+                                  ActiveMQConnection* connection,
+                                  bool sessionAsyncDispatch )
 {
     if( sessionInfo == NULL || connection == NULL )
     {
@@ -54,6 +56,7 @@
     this->connection   = connection;
     this->closed       = false;
     this->asyncThread  = NULL;
+    this->sessionAsyncDispatch = sessionAsyncDispatch;
     this->useAsyncSend = Boolean::parseBoolean(
         properties.getProperty( "useAsyncSend", "false" ) );
 
@@ -69,6 +72,9 @@
         transaction =
             new ActiveMQTransaction(connection, this, properties );
     }
+    
+    // Create the session executor object.
+    executor = new ActiveMQSessionExecutor( this );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -93,11 +99,14 @@
 
     try
     {
+        // Stop the dispatch executor.
+        stop();        
+        
         // Get the complete list of closeable session resources.
         std::vector<cms::Closeable*> allResources;
         synchronized( &closableSessionResources ) {
             allResources = closableSessionResources.toArray();
-        }
+        }        
 
         // Close all of the resources.
         for( unsigned int ix=0; ix<allResources.size(); ++ix ){
@@ -127,6 +136,9 @@
 
         // Remove any unsent cloned messages.
         purgeMessages();
+        
+        delete executor;
+        executor = NULL;
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -227,31 +239,38 @@
                 "ActiveMQSession::createConsumer - Session Already Closed" );
         }
 
-        ConsumerInfo* consumerInfo =
+		ConsumerInfo* consumerInfo =
             connection->getConnectionData()->getConnector()->
                 createConsumer( destination,
                                 sessionInfo,
                                 selector,
                                 noLocal );
 
-        // Add to Session Closeables and Monitor for close, if needed.
+		// Add to Session Closeables and Monitor for close, if needed.
         checkConnectorResource(
             dynamic_cast<ConnectorResource*>( consumerInfo ) );
 
-        // Create the consumer instance.
+		// Create the consumer instance.
         ActiveMQConsumer* consumer = new ActiveMQConsumer(
             consumerInfo, this );
 
-        // Register this consumer as a listener of messages from the
-        // connection.
-        connection->addMessageListener(
-            consumer->getConsumerInfo()->getConsumerId(), consumer );
+
+        // Add the consumer to the map.
+        synchronized( &consumers ) {
+            consumers.setValue( consumerInfo->getConsumerId(), consumer );
+        }
+
+        // Register this as a message dispatcher for the consumer.
+        connection->addDispatcher( consumerInfo, this );
 
         // Start the Consumer, we are now ready to receive messages
         try{
             connection->getConnectionData()->getConnector()->startConsumer(
-                consumer->getConsumerInfo() );
+                consumerInfo );
         } catch( ActiveMQException& ex ) {
+			synchronized( &consumers ) {
+            	consumers.remove( consumerInfo->getConsumerId() );
+        	}
             delete consumer;
             ex.setMark( __FILE__, __LINE__ );
             throw ex;
@@ -293,16 +312,22 @@
         ActiveMQConsumer* consumer = new ActiveMQConsumer(
             consumerInfo, this );
 
-        // Register the consumer as a listener of messages from the
-        // connection.
-        connection->addMessageListener(
-            consumer->getConsumerInfo()->getConsumerId(), consumer );
+        // Add the consumer to the map.
+        synchronized( &consumers ) {
+            consumers.setValue( consumerInfo->getConsumerId(), consumer );
+        }
+
+        // Register this as a message dispatcher for the consumer.
+        connection->addDispatcher( consumerInfo, this );
 
         // Start the Consumer, we are now ready to receive messages
         try{
             connection->getConnectionData()->getConnector()->startConsumer(
-                consumer->getConsumerInfo() );
+                consumerInfo );
         } catch( ActiveMQException& ex ) {
+			synchronized( &consumers ) {
+            	consumers.remove( consumerInfo->getConsumerId() );
+        	}		
             delete consumer;
             ex.setMark( __FILE__, __LINE__ );
             throw ex;
@@ -582,7 +607,7 @@
         }
 
         cms::MapMessage* message = connection->getConnectionData()->
-            getConnector()->createMapMessage( sessionInfo, transaction );
+                getConnector()->createMapMessage( sessionInfo, transaction );
 
         // Add to Session Closeables and Monitor for close, if needed.
         checkConnectorResource(
@@ -653,7 +678,7 @@
                 "ActiveMQSession::onProducerClose - Session Already Closed" );
         }
 
-        if( useAsyncSend == true ) {
+        if( useAsyncSend ) {
 
             // Put it in the send queue, thread will dispatch it.  We clone it
             // in case the client deletes their copy before we get a chance to
@@ -689,11 +714,10 @@
         const ConsumerInfo* consumer =
             dynamic_cast<const ConsumerInfo*>( resource );
 
-        if( consumer != NULL ) {
-
-            // Remove this Consumer from the Connection
-            connection->removeMessageListener(
-                consumer->getConsumerId() );
+        if( consumer != NULL )
+        {
+            // Remove the dispatcher for the Connection
+            connection->removeDispatcher( consumer );                           
 
             // Remove this consumer from the Transaction if we are
             // transactional
@@ -701,6 +725,11 @@
                 transaction->removeFromTransaction(
                     consumer->getConsumerId() );
             }
+            
+            // Remove this consumer from the consumers map.
+            synchronized( &consumers ) {
+                consumers.remove( consumer->getConsumerId() );
+            }
         }
 
         // Remove the entry from the session resource map if it's there
@@ -884,3 +913,58 @@
     // Register as a Listener
     resource->addListener( this );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::dispatch( DispatchData& message ) {
+    
+    if( executor != NULL ) {
+        executor->execute( message );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::redispatch( util::Queue<DispatchData>& unconsumedMessages )
+{
+    util::Queue<DispatchData> reversedList;
+    
+    // Copy the list in reverse order then clear the original list.
+    synchronized( &unconsumedMessages ) {
+        unconsumedMessages.reverse( reversedList );
+        unconsumedMessages.clear();
+    }
+    
+    // Add the list to the front of the executor.
+    while( !reversedList.empty() ) {
+        DispatchData data = reversedList.pop();
+        executor->executeFirst( data );
+    }
+    
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::start()
+{
+    if( executor != NULL ) {
+        executor->start();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::stop()
+{
+    if( executor != NULL ) {
+        executor->stop();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSession::isStarted() const
+{
+    if( executor == NULL ) {
+        return false;
+    }
+    
+    return executor->isStarted();
+}
+
+

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Sun Mar 18 03:54:18 2007
@@ -22,9 +22,11 @@
 #include <activemq/concurrent/Runnable.h>
 #include <activemq/concurrent/Mutex.h>
 #include <activemq/connector/SessionInfo.h>
+#include <activemq/core/Dispatcher.h>
 #include <activemq/util/Set.h>
 #include <activemq/util/Queue.h>
 #include <activemq/connector/ConnectorResourceListener.h>
+#include <activemq/util/Map.h>
 #include <set>
 
 namespace activemq{
@@ -36,9 +38,11 @@
     class ActiveMQMessage;
     class ActiveMQProducer;
     class ActiveMQConsumer;
+    class ActiveMQSessionExecutor;
 
     class ActiveMQSession :
         public cms::Session,
+        public Dispatcher,
         public concurrent::Runnable,
         public connector::ConnectorResourceListener
     {
@@ -70,6 +74,11 @@
          * destination.
          */
         util::Set<cms::Closeable*> closableSessionResources;
+        
+        /**
+         * Map of consumers.
+         */
+        util::Map<long long, ActiveMQConsumer*> consumers;
 
         /**
          *  Thread to notif a listener if one is added
@@ -80,19 +89,68 @@
          * Is this Session using Async Sends.
          */
         bool useAsyncSend;
+        
+        /**
+         * Indicates whether or not dispatching should be done asynchronously.
+         */
+        bool sessionAsyncDispatch;
 
         /**
          * Outgoing Message Queue
          */
         util::Queue< std::pair<cms::Message*, ActiveMQProducer*> > msgQueue;
+        
+        ActiveMQSessionExecutor* executor;
 
     public:
 
         ActiveMQSession( connector::SessionInfo* sessionInfo,
                          const util::Properties& properties,
-                         ActiveMQConnection* connection );
+                         ActiveMQConnection* connection,
+                         bool sessionAsyncDispatch );
 
         virtual ~ActiveMQSession();
+        
+        /**
+         * Indicates whether or not dispatching should be done asynchronously.
+         */
+        bool isSessionAsyncDispatch() const {
+            return sessionAsyncDispatch;
+        }
+        
+        util::Map<long long, ActiveMQConsumer*>& getConsumers() {
+            return consumers;
+        }
+        
+        /**
+         * Redispatches the given set of unconsumed messages to the consumers.
+         * @param unconsumedMessages - unconsumed messages to be redelivered.
+         */
+        void redispatch( util::Queue<DispatchData>& unconsumedMessages );
+        
+        /**
+         * Stops asynchronous message delivery.
+         */
+        void start();
+        
+        /**
+         * Starts asynchronous message delivery.
+         */
+        void stop();
+        
+        /**
+         * Indicates whether or not the session is currently in the started
+         * state.
+         */
+        bool isStarted() const;
+    
+    public:  // Methods from ActiveMQMessageDispatcher
+    
+        /**
+         * Dispatches a message to a particular consumer.
+         * @param message - the message to be dispatched
+         */
+        virtual void dispatch( DispatchData& message );
 
     public:   // Implements Mehtods
 
@@ -267,21 +325,21 @@
          * @return transacted true - false.
          */
         virtual bool isTransacted() const;
-
+        
         /**
-         * Unsubscribes a durable subscription that has been created by a
+         * Unsubscribes a durable subscription that has been created by a 
          * client.
-         *
-         * This method deletes the state being maintained on behalf of the
-         * subscriber by its provider.  It is erroneous for a client to delete a
-         * durable subscription while there is an active MessageConsumer or
-         * Subscriber for the subscription, or while a consumed message is
-         * part of a pending transaction or has not been acknowledged in the
+         * 
+         * This method deletes the state being maintained on behalf of the 
+         * subscriber by its provider.  It is erroneous for a client to delete a 
+         * durable subscription while there is an active MessageConsumer or 
+         * Subscriber for the subscription, or while a consumed message is 
+         * part of a pending transaction or has not been acknowledged in the 
          * session.
          * @param name the name used to identify this subscription
          * @throws CMSException
          */
-        virtual void unsubscribe( const std::string& name )
+        virtual void unsubscribe( const std::string& name ) 
             throw ( cms::CMSException );
 
    public:   // ActiveMQSession specific Methods

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?view=auto&rev=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Sun Mar 18 03:54:18 2007
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "ActiveMQSessionExecutor.h"
+#include "ActiveMQSession.h"
+#include "ActiveMQMessage.h"
+#include "ActiveMQConsumer.h"
+#include <activemq/connector/ConsumerInfo.h>
+ 
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::concurrent;
+using namespace activemq::exceptions;
+
+//////////////////////////////////////////////////////////////////////////////// 
+ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) {
+    this->session = session;
+    this->started = false;
+    this->thread = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSessionExecutor::~ActiveMQSessionExecutor() {
+    
+    // Stop the thread if it's running.
+    stop();
+    
+    // Empty the message queue and destroy any remaining messages.
+    clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::execute( DispatchData& data ) {
+        
+    // If dispatch async is off, just dispatch in the context of the transport
+    // thread.
+    if ( !session->isSessionAsyncDispatch() ){
+        dispatch(data);
+    }else {
+        
+        synchronized( &messageQueue ) {
+            messageQueue.push(data);
+            wakeup();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
+        
+    // If dispatch async is off, just dispatch in the context of the transport
+    // thread.
+    if ( !session->isSessionAsyncDispatch() && started ){
+        dispatch(data);
+    }else {
+        
+        synchronized( &messageQueue ) {
+            messageQueue.enqueueFront(data);
+            wakeup();
+        }
+    }
+}
+    
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::start() {
+    
+    synchronized( &messageQueue ) {
+        started = true;
+        
+        // Don't create the thread unless we need to.
+        if( session->isSessionAsyncDispatch() && thread == NULL ) {
+            thread = new Thread( this );
+            thread->start();
+        }
+        
+        wakeup();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::stop() {
+    
+    synchronized( &messageQueue ) {
+        
+        started = false;
+        wakeup();
+    }
+    
+    if( thread != NULL ) {
+        thread->join();
+        delete thread;
+        thread = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::clear() {
+    
+    synchronized( &messageQueue ) {
+        
+        while( !messageQueue.empty() ) {
+            DispatchData data = messageQueue.pop();
+            delete data.getMessage();
+        }
+        
+        wakeup();
+    }
+    
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::dispatch( DispatchData& data ) {
+        
+    ActiveMQConsumer* consumer = NULL;
+    util::Map<long long, ActiveMQConsumer*>& consumers = session->getConsumers();
+    
+    synchronized(&consumers) {
+        consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
+    }
+    
+    if( consumer != NULL ) {
+        consumer->dispatch( data );
+    }
+        
+}
+    
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::run() {
+ 
+    try {
+        
+        while( started ) {
+            
+            // Dispatch all currently available messages.
+            dispatchAll();
+            
+            synchronized( &messageQueue ) {
+                
+                if( messageQueue.empty() && started ) {
+            
+                    // Wait for more data or to be woken up.
+                    messageQueue.wait();
+                }
+            }
+        }
+        
+    } catch( ActiveMQException& ex ) {
+        ex.printStackTrace();
+    } catch( ... ) {
+        ActiveMQException ex(__FILE__, __LINE__, "caught unknown exception" );
+        ex.printStackTrace();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::dispatchAll() {
+    
+    // Take out all of the dispatch data currently in the array.
+    std::vector<DispatchData> dataList;
+    synchronized( &messageQueue ) {
+        
+        dataList = messageQueue.toArray();
+        messageQueue.clear();
+    }
+    
+    // Dispatch all currently available messages.
+    for( unsigned int ix=0; ix<dataList.size(); ++ix ) {
+        DispatchData& data = dataList[ix];
+        dispatch( data );
+    }
+}
+ 
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::wakeup() {
+ 
+    if( session->isSessionAsyncDispatch() ) {
+        synchronized( &messageQueue ) {
+            messageQueue.notifyAll();
+        }
+    } else if( started ){
+        dispatchAll();
+    }
+}
+

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h?view=auto&rev=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h Sun Mar 18 03:54:18 2007
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_
+#define ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_
+
+#include <activemq/core/Dispatcher.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/util/Queue.h>
+
+namespace activemq{
+namespace core{
+  
+    class ActiveMQSession;
+  
+    /**
+     * Delegate dispatcher for a single session.  Contains a thread
+     * to provide for asynchronous dispatching.
+     */
+    class ActiveMQSessionExecutor : 
+        public Dispatcher, 
+        public concurrent::Runnable 
+    {   
+    private:
+        
+        ActiveMQSession* session;
+        util::Queue<DispatchData> messageQueue;
+        bool started;
+        concurrent::Thread* thread;
+        
+    public:
+    
+        /**
+         * Creates an un-started executor for the given session.
+         */
+        ActiveMQSessionExecutor( ActiveMQSession* session );
+        
+        /**
+         * Calls stop() then clear().
+         */
+        virtual ~ActiveMQSessionExecutor();
+        
+        /**
+         * Executes the dispatch.  Adds the given data to the
+         * end of the queue.
+         * @param data - the data to be dispatched.
+         */
+        virtual void execute( DispatchData& data );
+        
+        /**
+         * Executes the dispatch.  Adds the given data to the
+         * beginning of the queue.
+         * @param data - the data to be dispatched.
+         */
+        virtual void executeFirst( DispatchData& data );
+            
+        /**
+         * Starts the dispatching.
+         */
+        virtual void start();
+        
+        /**
+         * Stops dispatching.
+         */
+        virtual void stop();
+        
+        /**
+         * Indicates if the executor is started
+         */
+        virtual bool isStarted() const {
+            return started;
+        }
+        
+        /**
+         * Removes all queued messgaes and destroys them.
+         */
+        virtual void clear();
+        
+        /**
+         * Depending on whether or not the session is async,
+         * notifies the thread or simply dispatches all available
+         * messages synchronously.
+         */
+        virtual void wakeup();
+            
+        /**
+         * Dispatches a message to a particular consumer.
+         * @param consumer - The consumer to dispatch to.
+         * @param msg - The message to be dispatched.
+         */
+        virtual void dispatch( DispatchData& data );
+            
+        /**
+         * Run method - called by the Thread class in the context
+         * of the thread.
+         */
+        virtual void run();
+        
+    private:
+    
+        /**
+         * Dispatches all messages currently in the queue.
+         */
+        void dispatchAll();
+    };
+    
+}}
+
+#endif /*ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp Sun Mar 18 03:54:18 2007
@@ -341,28 +341,30 @@
 
         for( ; itr != messages.end(); ++itr )
         {
-            ( *itr )->setRedeliveryCount( ( *itr )->getRedeliveryCount() + 1 );
+            ActiveMQMessage* message = *itr;
+            message->setRedeliveryCount( message->getRedeliveryCount() + 1 );
 
             // Redeliver Messages at some point in the future
             Thread::sleep( redeliveryDelay );
 
-            if( ( *itr )->getRedeliveryCount() >= maxRedeliveries )
+            if( message->getRedeliveryCount() >= maxRedeliveries )
             {
                 // Poison Ack the Message, we give up processing this one
                 connection->getConnectionData()->getConnector()->
                     acknowledge(
                         session->getSessionInfo(),
                         consumer->getConsumerInfo(),
-                        dynamic_cast< Message* >( *itr ),
+                        dynamic_cast< Message* >( message ),
                         Connector::PoisonAck );
 
                 // Won't redeliver this so we kill it here.
-                delete *itr;
+                delete message;
 
                 return;
             }
 
-            consumer->onActiveMQMessage( *itr );
+            DispatchData data( consumer->getConsumerInfo(), message );
+            consumer->dispatch( data );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h Sun Mar 18 03:54:18 2007
@@ -78,7 +78,7 @@
         // Transaction Info for the current Transaction
         connector::TransactionInfo* transactionInfo;
 
-        // Map of ActiveMQMessageListener to Messages to Rollback
+        // Map of ActiveMQMessageConsumer to Messages to Rollback
         RollbackMap rollbackMap;
 
         // Lock object to protect the rollback Map

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h?view=auto&rev=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h Sun Mar 18 03:54:18 2007
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CORE_DISPATCHDATA_H_
+#define ACTIVEMQ_CORE_DISPATCHDATA_H_
+
+#include <stdlib.h>
+
+namespace activemq {
+    
+    namespace connector {
+        class ConsumerInfo;
+    }
+    
+namespace core {
+
+    class ActiveMQMessage;
+    
+    /**
+     * Contains information about dispatching to a particular consumer.
+     */
+    class DispatchData {
+    private:
+    
+        connector::ConsumerInfo* consumer;
+        ActiveMQMessage* message;
+        
+    public:
+    
+        DispatchData(){
+            consumer = NULL;
+            message = NULL;
+        }
+        
+        DispatchData( connector::ConsumerInfo* consumer, ActiveMQMessage* message ) {
+            this->consumer = consumer;
+            this->message = message;
+        }
+        
+        DispatchData( const DispatchData& d ) {
+            (*this) = d;
+        }
+        
+        DispatchData& operator =( const DispatchData& d ) {
+            this->consumer = d.consumer;
+            this->message = d.message;
+            return *this;
+        }
+        
+        connector::ConsumerInfo* getConsumer() {
+            return consumer;
+        }
+        
+        ActiveMQMessage* getMessage() {
+            return message;
+        }
+          
+    };    
+}}
+
+#endif /*ACTIVEMQ_CORE_DISPATCHDATA_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h?view=auto&rev=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h Sun Mar 18 03:54:18 2007
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CORE_DISPATCHER_H_
+#define ACTIVEMQ_CORE_DISPATCHER_H_
+
+#include <activemq/core/DispatchData.h>
+
+namespace activemq{
+namespace core{
+    
+    /**
+     * Interface for an object responsible for dispatching messages to 
+     * consumers.
+     */
+    class Dispatcher {
+    public:
+    
+        virtual ~Dispatcher(){}
+        
+        /**
+         * Dispatches a message to a particular consumer.
+         * @param message - the message to be dispatched.
+         */
+        virtual void dispatch( DispatchData& message ) = 0;
+    };
+    
+}}
+
+#endif /*ACTIVEMQ_CORE_DISPATCHER_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Queue.h Sun Mar 18 03:54:18 2007
@@ -17,7 +17,8 @@
 #ifndef ACTIVEMQ_UTIL_QUEUE_H
 #define ACTIVEMQ_UTIL_QUEUE_H
 
-#include <queue>
+#include <list>
+#include <vector>
 #include <activemq/concurrent/Mutex.h>
 #include <activemq/exceptions/ActiveMQException.h>
 
@@ -62,6 +63,11 @@
         virtual ~Queue(void);
 
         /**
+         * Empties this queue.
+         */
+        void clear();
+        
+        /**
          * Returns a Reference to the element at the head of the queue
          * @return reference to a queue type object or (safe)
          */
@@ -90,6 +96,12 @@
          * @param t - Queue Object Type reference.
          */
         void push( const T &t );
+        
+        /**
+         * Places a new Object at the front of the queue
+         * @param t - Queue Object Type reference.
+         */
+        void enqueueFront( const T &t );
 
         /**
          * Removes and returns the element that is at the Head of the queue
@@ -108,6 +120,19 @@
          * @return boolean indicating queue emptiness
          */
         bool empty(void) const;
+        
+        /**
+         * @return the all values in this queue as a std::vector.
+         */
+        virtual std::vector<T> toArray() const;
+        
+        /**
+         * Reverses the order of the contents of this queue and stores them
+         * in the target queue.
+         * @param target - The target queue that will receive the contents of
+         * this queue in reverse order.
+         */
+        void reverse( Queue<T>& target ) const;
    
         /**
          * Locks the object.
@@ -176,7 +201,7 @@
     private:
 
         // The real queue
-        std::queue<T> queue;
+        std::list<T> queue;
 
         // Object used for sync
         concurrent::Mutex mutex;
@@ -190,7 +215,7 @@
     //-----{ Static Init }----------------------------------------------------//
     template <typename T>
     T Queue<T>::safe;
-   
+    
     //-----{ Retrieve current length of Queue }-------------------------------//
    
     template <typename T> inline size_t Queue<T>::size() const
@@ -218,12 +243,24 @@
     {
     }
 
+    template <typename T> 
+    void Queue<T>::clear()
+    {
+        queue.clear();
+    }
+    
     //-----{ Add Elements to Back of Queue }----------------------------------//
 
     template <typename T> 
     void Queue<T>::push( const T &t )
     {
-        queue.push( t );
+        queue.push_back( t );
+    }
+    
+    template <typename T> 
+    void Queue<T>::enqueueFront( const T &t )
+    {
+        queue.push_front( t );
     }
 
     //-----{ Remove Elements from Front of Queue }----------------------------//
@@ -239,7 +276,7 @@
         // Pop the element into a temp, since we need to remain locked.
         // this means getting front and then popping.
         T temp = queue.front();
-        queue.pop();
+        queue.pop_front();
    
         return temp;
     }
@@ -294,6 +331,30 @@
         }
    
         return queue.back();
+    }
+    
+    template <typename T> 
+    void Queue<T>::reverse( Queue<T>& target ) const 
+    {
+        typename std::list<T>::const_reverse_iterator iter;
+        iter = queue.rbegin();
+        for( ; iter != queue.rend(); ++iter ) {
+            target.push( *iter );
+        }
+    }
+    
+    ////////////////////////////////////////////////////////////////////////////
+    template <typename T>
+    std::vector<T> Queue<T>::toArray() const{
+        std::vector<T> valueArray(queue.size());
+        
+        typename std::list<T>::const_iterator iter;
+        iter=queue.begin();
+        for( int ix=0; iter != queue.end(); ++iter, ++ix ){
+            valueArray[ix] = *iter;
+        }
+        
+        return valueArray;
     }
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h Sun Mar 18 03:54:18 2007
@@ -43,13 +43,13 @@
          * @throws CMSException
          */
         virtual void close() throw( CMSException ) = 0;
-        
-        /**
-         * Creates a new Session to work for this Connection
+
+		/**
+         * Creates an AUTO_ACKNOWLEDGE Session.
          * @throws CMSException
          */
         virtual Session* createSession() throw ( CMSException ) = 0;
-
+				
         /**
          * Creates a new Session to work for this Connection using the
          * specified acknowledgment mode

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp Sun Mar 18 03:54:18 2007
@@ -102,7 +102,7 @@
         // Create CMS Object for Comms
         cms::Session* session = testSupport.getSession();
 
-        cms::Destination* requestTopic = session->createTopic( "myRequestTopic" );
+        cms::Destination* requestTopic = session->createTopic( Guid::createGUIDString() );
         cms::Destination* responseTopic = session->createTemporaryTopic();
 
         Consumer* requestConsumer = new Consumer( testSupport.getConnection(),

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h Sun Mar 18 03:54:18 2007
@@ -139,6 +139,8 @@
             delete info2;
             delete info3;
             delete info4;
+            
+            delete connector;
         }
 
         void testConsumers()
@@ -194,6 +196,8 @@
             delete cinfo2;
             delete cinfo3;
             delete cinfo4;
+            
+            delete connector;
         }
 
         void testCommand()
@@ -260,6 +264,8 @@
             delete cinfo2;
             delete cinfo3;
             delete cinfo4;
+            
+            delete connector;
         }
 
         void testSendingCommands(){
@@ -304,6 +310,8 @@
 
             delete cinfo1;
             delete cinfo2;
+            
+            delete connector;
         }
 
         void testSubscribeOptions(){
@@ -427,6 +435,8 @@
 
             // Done
             delete session;
+            
+            delete connector;
 
         }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h Sun Mar 18 03:54:18 2007
@@ -110,19 +110,19 @@
             }
         };
 
-        class MyActiveMQMessageListener : public ActiveMQMessageListener
+        class MyDispatcher : public Dispatcher
         {
         public:
 
             std::vector<ActiveMQMessage*> messages;
 
         public:
-            virtual ~MyActiveMQMessageListener(){}
+            virtual ~MyDispatcher(){}
 
-            virtual void onActiveMQMessage( ActiveMQMessage* message )
+            virtual void dispatch( DispatchData& data )
                 throw ( exceptions::ActiveMQException )
             {
-                messages.push_back( message );
+                messages.push_back( data.getMessage() );
             }
         };
 
@@ -136,7 +136,7 @@
                 MyMessageListener listener;
                 MyExceptionListener exListener;
                 MyCommandListener cmdListener;
-                MyActiveMQMessageListener msgListener;
+                MyDispatcher msgListener;
                 std::string connectionId = "testConnectionId";
                 util::SimpleProperties* properties =
                     new util::SimpleProperties();
@@ -198,7 +198,7 @@
                 consumer.setSessionInfo( &session );
                 consumer.setDestination( &myTopic );
 
-                connection.addMessageListener( 1, &msgListener );
+                connection.addDispatcher( &consumer, &msgListener );
 
                 connector::stomp::commands::TextMessageCommand* cmd =
                     new connector::stomp::commands::TextMessageCommand;
@@ -217,14 +217,14 @@
 
                 CPPUNIT_ASSERT( msgListener.messages.size() == 1 );
 
-                connection.removeMessageListener( 1 );
+                connection.removeDispatcher( &consumer );
 
                 msgListener.messages.clear();
                 consumerListener->onConsumerMessage( &consumer, cmd );
 
                 CPPUNIT_ASSERT( msgListener.messages.size() == 0 );
 
-                connection.addMessageListener( 1, &msgListener );
+                connection.addDispatcher( &consumer, &msgListener );
 
                 connection.stop();
                 consumerListener->onConsumerMessage( &consumer, cmd );
@@ -239,7 +239,7 @@
                 consumerListener->onConsumerMessage( &consumer, cmd );
                 CPPUNIT_ASSERT( msgListener.messages.size() == 1 );
 
-                connection.removeMessageListener( 1 );
+                connection.removeDispatcher( &consumer );
                 msgListener.messages.clear();
 
                 session1->close();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h?view=diff&rev=519590&r1=519589&r2=519590
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h Sun Mar 18 03:54:18 2007
@@ -368,7 +368,7 @@
                 }
             }
 
-            CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
+            CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
 
             msgListener1.messages[0]->acknowledge();
 
@@ -382,7 +382,7 @@
                 }
             }
 
-            CPPUNIT_ASSERT( msgListener2.messages.size() == 1 );
+            CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener2.messages.size() );
             
             msgListener2.messages[0]->acknowledge();
 
@@ -459,7 +459,7 @@
                 }
             }
 
-            CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
+            CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
 
             session->commit();