You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/01/24 22:54:59 UTC
svn commit: r499583 - in
/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main:
activemq/core/ActiveMQConnection.cpp activemq/core/ActiveMQConnection.h
activemq/core/ActiveMQSession.cpp activemq/core/ActiveMQSession.h
cms/Connection.h cms/Session.h
Author: nmittler
Date: Wed Jan 24 13:54:58 2007
New Revision: 499583
URL: http://svn.apache.org/viewvc?view=rev&rev=499583
Log:
Making Connection.close() also close any non-closed child sessions.
Modified:
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Jan 24 13:54:58 2007
@@ -73,10 +73,18 @@
{
try
{
- return new ActiveMQSession(
+ // Create the session instance.
+ ActiveMQSession* session = new ActiveMQSession(
connectionData->getConnector()->createSession( ackMode ),
connectionData->getProperties(),
this );
+
+ // Add the session to the set of active sessions.
+ synchronized( &activeSessions ) {
+ activeSessions.add( session );
+ }
+
+ return session;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -97,6 +105,22 @@
{
return;
}
+
+ // Get the complete list of active sessions.
+ std::vector<cms::Session*> allSessions;
+ synchronized( &activeSessions ) {
+ allSessions = activeSessions.toArray();
+ }
+
+ // Close all of the resources.
+ for( unsigned int ix=0; ix<allSessions.size(); ++ix ){
+ cms::Session* session = allSessions[ix];
+ try{
+ session->close();
+ } catch( cms::CMSException& ex ){
+ /* Absorb */
+ }
+ }
// Once current deliveries are done this stops the delivery
// of any new messages.
@@ -136,9 +160,9 @@
ActiveMQMessageListener* listener )
{
// Place in Map
- synchronized( &mutex )
+ synchronized( &consumers )
{
- consumers[consumerId] = listener;
+ consumers.setValue( consumerId, listener );
}
}
@@ -146,9 +170,9 @@
void ActiveMQConnection::removeMessageListener( const unsigned int consumerId )
{
// Remove from Map
- synchronized( &mutex )
+ synchronized( &consumers )
{
- consumers.erase( consumerId );
+ consumers.remove( consumerId );
}
}
@@ -187,12 +211,14 @@
}
// Started, so lock map and dispatch the message.
- synchronized( &mutex )
+ synchronized( &consumers )
{
- if(consumers.find( consumer->getConsumerId()) != consumers.end() )
+ if( consumers.containsKey(consumer->getConsumerId()) )
{
- consumers[consumer->getConsumerId()]->
- onActiveMQMessage( message );
+ ActiveMQMessageListener* listener =
+ consumers.getValue(consumer->getConsumerId());
+
+ listener->onActiveMQMessage( message );
}
}
}
@@ -217,5 +243,27 @@
if( exceptionListener != NULL ){
exceptionListener->onException( ex );
}
-}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeSession( ActiveMQSession* session )
+ throw ( cms::CMSException )
+{
+ try
+ {
+ // Remove this session from the set of active sessions.
+ synchronized( &activeSessions ) {
+ activeSessions.remove( session );
+ }
+
+ // Destroy this sessions resources
+ getConnectionData()->
+ getConnector()->destroyResource( session->getSessionInfo() );
+
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Wed Jan 24 13:54:58 2007
@@ -14,27 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
#define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
#include <cms/Connection.h>
#include <cms/ExceptionListener.h>
-#include <activemq/concurrent/Mutex.h>
#include <activemq/core/ActiveMQConnectionData.h>
#include <activemq/core/ActiveMQMessageListener.h>
#include <activemq/core/ActiveMQMessage.h>
#include <activemq/connector/ConsumerMessageListener.h>
#include <activemq/util/Properties.h>
+#include <activemq/util/Map.h>
+#include <activemq/util/Set.h>
-#include <map>
#include <string>
namespace activemq{
namespace core{
- class cms::Session;
+ class cms::Session;
+ class ActiveMQSession;
class ActiveMQConsumer;
+ /**
+ * Concrete connection used for all connectors to the
+ * ActiveMQ broker.
+ */
class ActiveMQConnection :
public cms::Connection,
public connector::ConsumerMessageListener,
@@ -42,24 +48,36 @@
{
private:
- // the registered exception listener
+ /**
+ * the registered exception listener
+ */
cms::ExceptionListener* exceptionListener;
- // All the data that is used to connect this Connection
+ /**
+ * All the data that is used to connect this Connection
+ */
ActiveMQConnectionData* connectionData;
- // Indicates if this Connection is started
+ /**
+ * Indicates if this Connection is started
+ */
bool started;
- // Indicates that this connection has been closed, it is no longer
- // usable after this becomes true
+ /**
+ * Indicates that this connection has been closed, it is no longer
+ * usable after this becomes true
+ */
bool closed;
- // Map of Consumer Ids to ActiveMQMessageListeners
- std::map< unsigned int, ActiveMQMessageListener* > consumers;
-
- // Mutex to lock the Consumers Map
- concurrent::Mutex mutex;
+ /**
+ * Map of Consumer Ids to ActiveMQMessageListeners
+ */
+ util::Map< unsigned int, ActiveMQMessageListener* > consumers;
+
+ /**
+ * Maintain the set of all active sessions.
+ */
+ util::Set<cms::Session*> activeSessions;
public:
@@ -71,6 +89,13 @@
virtual ~ActiveMQConnection();
+ /**
+ * Removes the session resources for the given session
+ * instance.
+ * @param session The session to be unregistered from this connection.
+ */
+ virtual void removeSession( ActiveMQSession* session ) throw ( cms::CMSException );
+
public: // Connection Interface Methods
/**
@@ -118,7 +143,9 @@
};
/**
- * Close the currently open connection
+ * Closes this connection as well as any Sessions
+ * created from it (and those Sessions' consumers and
+ * producers).
* @throws CMSException
*/
virtual void close() throw ( cms::CMSException );
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Jan 24 13:54:58 2007
@@ -105,8 +105,7 @@
}
// Destroy this sessions resources
- connection->getConnectionData()->
- getConnector()->destroyResource( sessionInfo );
+ connection->removeSession( this );
sessionInfo = NULL;
// Now indicate that this session is closed.
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Jan 24 13:54:58 2007
@@ -74,8 +74,9 @@
public: // Implements Mehtods
/**
- * Closes the Session
- * @throw CMSException
+ * Closes this session as well as any active child consumers or
+ * producers.
+ * @throws CMSException
*/
virtual void close() throw ( cms::CMSException );
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h Wed Jan 24 13:54:58 2007
@@ -37,10 +37,18 @@
virtual ~Connection(void) {}
/**
+ * Closes this connection as well as any Sessions
+ * created from it (and those Sessions' consumers and
+ * producers).
+ * @throws CMSException
+ */
+ virtual void close() throw( CMSException ) = 0;
+
+ /**
* Creates a new Session to work for this Connection
* @throws CMSException
*/
- virtual Session* createSession(void) throw ( CMSException ) = 0;
+ virtual Session* createSession() throw ( CMSException ) = 0;
/**
* Creates a new Session to work for this Connection using the
@@ -55,13 +63,13 @@
* Get the Client Id for this session
* @return Client Id String
*/
- virtual std::string getClientId(void) const = 0;
+ virtual std::string getClientId() const = 0;
/**
* Gets the registered Exception Listener for this connection
* @return pointer to an exception listnener or NULL
*/
- virtual ExceptionListener* getExceptionListener(void) const = 0;
+ virtual ExceptionListener* getExceptionListener() const = 0;
/**
* Sets the registed Exception Listener for this connection
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h?view=diff&rev=499583&r1=499582&r2=499583
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h Wed Jan 24 13:54:58 2007
@@ -77,6 +77,13 @@
virtual ~Session(void) {}
/**
+ * Closes this session as well as any active child consumers or
+ * producers.
+ * @throws CMSException
+ */
+ virtual void close() throw( CMSException ) = 0;
+
+ /**
* Commits all messages done in this transaction and releases any
* locks currently held.
* @throws CMSException