You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/01/24 22:54:59 UTC

svn commit: r499583 - in /incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main: activemq/core/ActiveMQConnection.cpp activemq/core/ActiveMQConnection.h activemq/core/ActiveMQSession.cpp activemq/core/ActiveMQSession.h cms/Connection.h cms/Session.h

Author: nmittler
Date: Wed Jan 24 13:54:58 2007
New Revision: 499583

URL: http://svn.apache.org/viewvc?view=rev&rev=499583
Log:
Making Connection.close() also close any non-closed child sessions.

Modified:
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Jan 24 13:54:58 2007
@@ -73,10 +73,18 @@
 {
     try
     {
-        return new ActiveMQSession(
+        // Create the session instance.
+        ActiveMQSession* session = new ActiveMQSession(
             connectionData->getConnector()->createSession( ackMode ), 
             connectionData->getProperties(),
             this );
+        
+        // Add the session to the set of active sessions.
+        synchronized( &activeSessions ) {
+            activeSessions.add( session );
+        } 
+            
+        return session;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -97,6 +105,22 @@
         {
             return;
         }
+        
+        // Get the complete list of active sessions.
+        std::vector<cms::Session*> allSessions;
+        synchronized( &activeSessions ) {
+            allSessions = activeSessions.toArray();
+        }
+        
+        // Close all of the resources.
+        for( unsigned int ix=0; ix<allSessions.size(); ++ix ){
+            cms::Session* session = allSessions[ix];
+            try{
+                session->close();
+            } catch( cms::CMSException& ex ){
+                /* Absorb */
+            }
+        }
 
         // Once current deliveries are done this stops the delivery 
         // of any new messages.
@@ -136,9 +160,9 @@
                                              ActiveMQMessageListener* listener )
 {
     // Place in Map
-    synchronized( &mutex )
+    synchronized( &consumers )
     {
-        consumers[consumerId] = listener;
+        consumers.setValue( consumerId, listener );
     }
 }
   
@@ -146,9 +170,9 @@
 void ActiveMQConnection::removeMessageListener( const unsigned int consumerId )
 {
     // Remove from Map
-    synchronized( &mutex )
+    synchronized( &consumers )
     {
-        consumers.erase( consumerId );
+        consumers.remove( consumerId );
     }
 }
 
@@ -187,12 +211,14 @@
         }
         
         // Started, so lock map and dispatch the message.
-        synchronized( &mutex )
+        synchronized( &consumers )
         {
-            if(consumers.find( consumer->getConsumerId()) != consumers.end() )
+            if( consumers.containsKey(consumer->getConsumerId()) )
             {
-                consumers[consumer->getConsumerId()]->
-                    onActiveMQMessage( message );
+                ActiveMQMessageListener* listener = 
+                    consumers.getValue(consumer->getConsumerId());
+                    
+                listener->onActiveMQMessage( message );
             }
         }        
     }
@@ -217,5 +243,27 @@
     if( exceptionListener != NULL ){
         exceptionListener->onException( ex );
     }
-}                                        
+}              
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeSession( ActiveMQSession* session ) 
+    throw ( cms::CMSException ) 
+{
+    try
+    {
+        // Remove this session from the set of active sessions.
+        synchronized( &activeSessions ) {
+            activeSessions.remove( session );
+        }
+        
+        // Destroy this sessions resources
+        getConnectionData()->
+            getConnector()->destroyResource( session->getSessionInfo() );
+            
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+                          
 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Wed Jan 24 13:54:58 2007
@@ -14,27 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+ 
 #ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
 #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
 
 #include <cms/Connection.h>
 #include <cms/ExceptionListener.h>
-#include <activemq/concurrent/Mutex.h>
 #include <activemq/core/ActiveMQConnectionData.h>
 #include <activemq/core/ActiveMQMessageListener.h>
 #include <activemq/core/ActiveMQMessage.h>
 #include <activemq/connector/ConsumerMessageListener.h>
 #include <activemq/util/Properties.h>
+#include <activemq/util/Map.h>
+#include <activemq/util/Set.h>
 
-#include <map>
 #include <string>
 
 namespace activemq{
 namespace core{
 
-    class cms::Session;   
+    class cms::Session;  
+    class ActiveMQSession; 
     class ActiveMQConsumer;
 
+    /**
+     * Concrete connection used for all connectors to the
+     * ActiveMQ broker.
+     */
     class ActiveMQConnection : 
         public cms::Connection,
         public connector::ConsumerMessageListener,
@@ -42,24 +48,36 @@
     {
     private:
    
-        // the registered exception listener
+        /**
+         * the registered exception listener
+         */
         cms::ExceptionListener* exceptionListener;
       
-        // All the data that is used to connect this Connection
+        /**
+         * All the data that is used to connect this Connection
+         */
         ActiveMQConnectionData* connectionData;
       
-        // Indicates if this Connection is started
+        /**
+         * Indicates if this Connection is started
+         */
         bool started;
 
-        // Indicates that this connection has been closed, it is no longer
-        // usable after this becomes true
+        /**
+         * Indicates that this connection has been closed, it is no longer
+         * usable after this becomes true
+         */
         bool closed;
       
-        // Map of Consumer Ids to ActiveMQMessageListeners
-        std::map< unsigned int, ActiveMQMessageListener* > consumers;
-      
-        // Mutex to lock the Consumers Map
-        concurrent::Mutex mutex;
+        /**
+         * Map of Consumer Ids to ActiveMQMessageListeners
+         */
+        util::Map< unsigned int, ActiveMQMessageListener* > consumers;
+        
+        /**
+         * Maintain the set of all active sessions.
+         */
+        util::Set<cms::Session*> activeSessions;
    
     public:
 
@@ -71,6 +89,13 @@
 
         virtual ~ActiveMQConnection();
    
+        /**
+         * Removes the session resources for the given session
+         * instance.
+         * @param session The session to be unregistered from this connection.
+         */
+        virtual void removeSession( ActiveMQSession* session ) throw ( cms::CMSException );
+        
     public:   // Connection Interface Methods
    
         /**
@@ -118,7 +143,9 @@
         };
          
         /**
-         * Close the currently open connection
+         * Closes this connection as well as any Sessions 
+         * created from it (and those Sessions' consumers and
+         * producers).
          * @throws CMSException
          */
         virtual void close() throw ( cms::CMSException );

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Jan 24 13:54:58 2007
@@ -105,8 +105,7 @@
         }
                 
         // Destroy this sessions resources
-        connection->getConnectionData()->
-            getConnector()->destroyResource( sessionInfo );
+        connection->removeSession( this );
         sessionInfo = NULL;
         
         // Now indicate that this session is closed.

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Jan 24 13:54:58 2007
@@ -74,8 +74,9 @@
     public:   // Implements Mehtods
    
         /**
-         * Closes the Session
-         * @throw CMSException
+         * Closes this session as well as any active child consumers or
+         * producers.
+         * @throws CMSException
          */
         virtual void close() throw ( cms::CMSException );
       

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h Wed Jan 24 13:54:58 2007
@@ -37,10 +37,18 @@
         virtual ~Connection(void) {}
 
         /**
+         * Closes this connection as well as any Sessions 
+         * created from it (and those Sessions' consumers and
+         * producers).
+         * @throws CMSException
+         */
+        virtual void close() throw( CMSException ) = 0;
+        
+        /**
          * Creates a new Session to work for this Connection
          * @throws CMSException
          */
-        virtual Session* createSession(void) throw ( CMSException ) = 0;
+        virtual Session* createSession() throw ( CMSException ) = 0;
 
         /**
          * Creates a new Session to work for this Connection using the
@@ -55,13 +63,13 @@
          * Get the Client Id for this session
          * @return Client Id String
          */
-        virtual std::string getClientId(void) const = 0;      
+        virtual std::string getClientId() const = 0;      
 
         /**
          * Gets the registered Exception Listener for this connection
          * @return pointer to an exception listnener or NULL
          */
-        virtual ExceptionListener* getExceptionListener(void) const = 0;
+        virtual ExceptionListener* getExceptionListener() const = 0;
 
         /**
          * Sets the registed Exception Listener for this connection

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h Wed Jan 24 13:54:58 2007
@@ -77,6 +77,13 @@
         virtual ~Session(void) {}
 
         /**
+         * Closes this session as well as any active child consumers or
+         * producers.
+         * @throws CMSException
+         */
+        virtual void close() throw( CMSException ) = 0;
+        
+        /**
          * Commits all messages done in this transaction and releases any 
          * locks currently held.
          * @throws CMSException