You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/25 16:55:03 UTC

svn commit: r522273 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQSession.cpp ActiveMQSessionExecutor.cpp ActiveMQSessionExecutor.h

Author: nmittler
Date: Sun Mar 25 07:55:02 2007
New Revision: 522273

URL: http://svn.apache.org/viewvc?view=rev&rev=522273
Log:
AMQCPP-83 - updates to purge messages from session executor for closing connection

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=522273&r1=522272&r2=522273
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sun Mar 25 07:55:02 2007
@@ -722,6 +722,7 @@
 
         if( consumer != NULL )
         {
+            // If the executor thread is currently running, stop it.
             bool wasStarted = isStarted();
             if( wasStarted ) {
                 stop();
@@ -736,12 +737,35 @@
                 transaction->removeFromTransaction(
                     consumer->getConsumerId() );
             }
-
-            // Remove this consumer from the consumers map.
+            
+            ActiveMQConsumer* obj = NULL;
             synchronized( &consumers ) {
-                consumers.remove( consumer->getConsumerId() );
+                
+                if( consumers.containsKey( consumer->getConsumerId() ) ) {
+                    
+                    // Get the consumer reference
+                    obj = consumers.getValue( consumer->getConsumerId() );
+
+                    // Remove this consumer from the map.
+                    consumers.remove( consumer->getConsumerId() );
+                }
+            }
+            
+            // Clean up any resources in the executor for this consumer
+            if( obj != NULL && executor != NULL ) {              
+            
+                // Purge any pending messages for this consumer.
+                vector<ActiveMQMessage*> messages = 
+                    executor->purgeConsumerMessages(obj);
+                    
+                // Destroy the messages.
+                for( unsigned int ix=0; ix<messages.size(); ++ix ) {
+                    delete messages[ix];
+                }
             }
 
+            // If the executor thread was previously running, start it back
+            // up.
             if( wasStarted ) {
                 start();
             }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?view=diff&rev=522273&r1=522272&r2=522273
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Sun Mar 25 07:55:02 2007
@@ -21,6 +21,7 @@
 #include "ActiveMQConsumer.h"
 #include <activemq/connector/ConsumerInfo.h>
 
+using namespace std;
 using namespace activemq;
 using namespace activemq::core;
 using namespace activemq::util;
@@ -31,6 +32,7 @@
 ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) {
 
     this->session = session;
+    this->closed = false;
     this->started = false;
     this->thread = NULL;
 }
@@ -40,9 +42,9 @@
 
     try {
 
-        // Stop the thread if it's running.
-        stop();
-
+        // Terminate the thread.
+        close();
+        
         // Empty the message queue and destroy any remaining messages.
         clear();
     }
@@ -50,12 +52,28 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::close() {
+    
+    synchronized( &mutex ) {
+
+        closed = true;
+        mutex.notifyAll();
+    }
+
+    if( thread != NULL ) {
+        thread->join();
+        delete thread;
+        thread = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::execute( DispatchData& data ) {
 
     // Add the data to the queue.
-    synchronized( &messageQueue ) {
-        messageQueue.push( data );
-        wakeup();
+    synchronized( &mutex ) {
+        messageQueue.push_back( data );
+        mutex.notifyAll();
     }
 }
 
@@ -63,16 +81,47 @@
 void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
 
     // Add the data to the front of the queue.
-    synchronized( &messageQueue ) {
-        messageQueue.enqueueFront( data );
-        wakeup();
+    synchronized( &mutex ) {
+        messageQueue.push_front( data );
+        mutex.notifyAll();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::start() {
+vector<ActiveMQMessage*> ActiveMQSessionExecutor::purgeConsumerMessages( 
+    ActiveMQConsumer* consumer )
+{
+    vector<ActiveMQMessage*> retVal;
+    
+    const connector::ConsumerInfo* consumerInfo = consumer->getConsumerInfo();
+    
+    synchronized( &mutex ) {
+        
+        list<DispatchData>::iterator iter = messageQueue.begin();
+        while( iter != messageQueue.end() ) {
+            list<DispatchData>::iterator currentIter = iter;
+            DispatchData& dispatchData = *iter++;
+            if( consumerInfo == dispatchData.getConsumer() ||
+                consumerInfo->getConsumerId() == dispatchData.getConsumer()->getConsumerId() ) 
+            {
+                retVal.push_back( dispatchData.getMessage() );
+                messageQueue.erase(currentIter);
+            }
+        }
+    }
+    
+    return retVal;
+}
 
-    synchronized( &messageQueue ) {
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionExecutor::start() {
+    
+    synchronized( &mutex ) {
+        
+        if( closed || started ) { 
+            return;
+        }
+    
         started = true;
 
         // Don't create the thread unless we need to.
@@ -81,37 +130,41 @@
             thread->start();
         }
 
-        wakeup();
+        mutex.notifyAll();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::stop() {
+    
+    synchronized( &mutex ) {
+        
+        if( closed || !started ) {
+            return;
+        }
 
-    synchronized( &messageQueue ) {
-
+        // Set the state to stopped.
         started = false;
-        wakeup();
-    }
-
-    if( thread != NULL ) {
-        thread->join();
-        delete thread;
-        thread = NULL;
+        
+        // Wakeup the thread so that it can acknowledge the stop request.
+        mutex.notifyAll();
+        
+        // Wait for the thread to notify us that it has acknowledged
+        // the stop request.
+        mutex.wait();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::clear() {
 
-    synchronized( &messageQueue ) {
+    synchronized( &mutex ) {
 
-        while( !messageQueue.empty() ) {
-            DispatchData data = messageQueue.pop();
+        list<DispatchData>::iterator iter = messageQueue.begin();
+        while( iter != messageQueue.end() ) {
+            DispatchData data = *iter++;
             delete data.getMessage();
         }
-
-        wakeup();
     }
 }
 
@@ -138,7 +191,7 @@
     } catch( ActiveMQException& ex ) {
         ex.setMark(__FILE__, __LINE__ );
         ex.printStackTrace();
-    } catch( std::exception& ex ) {
+    } catch( exception& ex ) {
         ActiveMQException amqex( __FILE__, __LINE__, ex.what() );
         amqex.printStackTrace();
     } catch( ... ) {
@@ -152,17 +205,29 @@
 
     try {
 
-        while( started ) {
+        while( true ) {
 
             // Dispatch all currently available messages.
             dispatchAll();
 
-            synchronized( &messageQueue ) {
+            synchronized( &mutex ) {
+                
+                // If we're closing down, exit the thread.
+                if( closed ) {
+                    return;
+                }
+                
+                // When told to stop, the calling thread will wait for a
+                // responding notification, indicating that we have acknowledged
+                // the stop command.
+                if( !started ) {
+                    mutex.notifyAll();
+                }
 
-                if( messageQueue.empty() && started ) {
+                if( messageQueue.empty() || !started ) {
 
                     // Wait for more data or to be woken up.
-                    messageQueue.wait();
+                    mutex.wait();
                 }
             }
         }
@@ -170,7 +235,7 @@
     } catch( ActiveMQException& ex ) {
         ex.setMark(__FILE__, __LINE__ );
         session->fire( ex );
-    } catch( std::exception& stdex ) {
+    } catch( exception& stdex ) {
         ActiveMQException ex( __FILE__, __LINE__, stdex.what() );
         session->fire( ex );
     } catch( ... ) {
@@ -181,27 +246,32 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::dispatchAll() {
-
+    
     // Take out all of the dispatch data currently in the array.
-    std::vector<DispatchData> dataList;
-    synchronized( &messageQueue ) {
+    list<DispatchData> dataList;
+    synchronized( &mutex ) {
+        
+        // When told to stop, the calling thread will wait for a
+        // responding notification, indicating that we have acknowledged
+        // the stop command.
+        if( !started ) {
+            mutex.notifyAll();
+        }
+                
+        if( !started || closed ) {
+            return;
+        }
 
-        dataList = messageQueue.toArray();
+        dataList = messageQueue;
         messageQueue.clear();
     }
-
+    
     // Dispatch all currently available messages.
-    for( unsigned int ix=0; ix<dataList.size(); ++ix ) {
-        DispatchData& data = dataList[ix];
+    list<DispatchData>::iterator iter = dataList.begin();
+    while( iter != dataList.end() ) {
+        DispatchData& data = *iter++;
         dispatch( data );
     }
 }
 
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::wakeup() {
-
-    synchronized( &messageQueue ) {
-        messageQueue.notifyAll();
-    }
-}
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h?view=diff&rev=522273&r1=522272&r2=522273
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h Sun Mar 25 07:55:02 2007
@@ -21,12 +21,15 @@
 #include <activemq/core/Dispatcher.h>
 #include <activemq/concurrent/Runnable.h>
 #include <activemq/concurrent/Thread.h>
-#include <activemq/util/Queue.h>
+#include <activemq/concurrent/Mutex.h>
+#include <vector>
+#include <list>
 
 namespace activemq{
 namespace core{
   
     class ActiveMQSession;
+    class ActiveMQConsumer;
   
     /**
      * Delegate dispatcher for a single session.  Contains a thread
@@ -39,9 +42,11 @@
     private:
         
         ActiveMQSession* session;
-        util::Queue<DispatchData> messageQueue;
-        bool started;
+        std::list<DispatchData> messageQueue;        
         concurrent::Thread* thread;
+        concurrent::Mutex mutex;
+        bool started;
+        bool closed;
         
     public:
     
@@ -70,6 +75,14 @@
         virtual void executeFirst( DispatchData& data );
             
         /**
+         * Removes all messages for the given consumer from the queue and
+         * returns them.
+         * @param consumer the subject consmer
+         * @return all messages that were queued for the consumer.
+         */
+        virtual std::vector<ActiveMQMessage*> purgeConsumerMessages( ActiveMQConsumer* consumer );
+        
+        /**
          * Starts the dispatching.
          */
         virtual void start();
@@ -80,6 +93,12 @@
         virtual void stop();
         
         /**
+         * Terminates the dispatching thread.  Once this is called, the executor is no longer
+         * usable.
+         */         
+        virtual void close();
+        
+        /**
          * Indicates if the executor is started
          */
         virtual bool isStarted() const {
@@ -90,13 +109,6 @@
          * Removes all queued messgaes and destroys them.
          */
         virtual void clear();
-        
-        /**
-         * Depending on whether or not the session is async,
-         * notifies the thread or simply dispatches all available
-         * messages synchronously.
-         */
-        virtual void wakeup();
             
         /**
          * Dispatches a message to a particular consumer.