You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/03/18 00:54:03 UTC
svn commit: r519474 [2/2] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: main/
main/activemq/connector/ main/activemq/connector/openwire/
main/activemq/connector/openwire/commands/ main/activemq/connector/stomp/
main/activemq/core/ main/cms/ test-...
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Sat Mar 17 16:54:01 2007
@@ -19,6 +19,7 @@
#include <activemq/core/ActiveMQSession.h>
#include <activemq/exceptions/NullPointerException.h>
#include <activemq/exceptions/InvalidStateException.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
#include <activemq/util/Date.h>
using namespace std;
@@ -49,6 +50,9 @@
this->disableTimestamps = false;
this->defaultPriority = 4;
this->defaultTimeToLive = 0;
+
+ // Listen for our resource to close
+ this->producerInfo->addListener( this );
}
////////////////////////////////////////////////////////////////////////////////
@@ -57,6 +61,8 @@
try
{
close();
+
+ delete producerInfo;
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
@@ -69,8 +75,13 @@
try
{
if( !closed ) {
- // Dispose of the ProducerInfo
- session->onDestroySessionResource( this );
+
+ // Close the ProducerInfo
+ if( !producerInfo->isClosed() ) {
+ // We don't want a callback now
+ this->producerInfo->removeListener( this );
+ this->producerInfo->close();
+ }
closed = true;
}
@@ -154,11 +165,11 @@
__FILE__, __LINE__,
"ActiveMQProducer::send - This Producer is closed" );
}
-
+
if( destination == NULL ) {
-
- throw InvalidStateException(
- __FILE__, __LINE__,
+
+ throw InvalidStateException(
+ __FILE__, __LINE__,
"ActiveMQProducer::send - Attempting to send on NULL destination");
}
@@ -185,3 +196,30 @@
AMQ_CATCHALL_THROW( ActiveMQException )
}
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducer::onConnectorResourceClosed(
+ const ConnectorResource* resource ) throw ( cms::CMSException ) {
+
+ try{
+
+ if( closed )
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQProducer::onConnectorResourceClosed - "
+ "Producer Already Closed");
+ }
+
+ if( resource != producerInfo ) {
+ throw IllegalArgumentException(
+ __FILE__, __LINE__,
+ "ActiveMQProducer::onConnectorResourceClosed - "
+ "Unknown object passed to this callback");
+ }
+
+ // If our producer isn't closed already, then lets close
+ this->close();
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Sat Mar 17 16:54:01 2007
@@ -22,7 +22,7 @@
#include <cms/Destination.h>
#include <cms/DeliveryMode.h>
-#include <activemq/core/ActiveMQSessionResource.h>
+#include <activemq/connector/ConnectorResourceListener.h>
#include <activemq/connector/ProducerInfo.h>
namespace activemq{
@@ -31,7 +31,7 @@
class ActiveMQSession;
class ActiveMQProducer : public cms::MessageProducer,
- public ActiveMQSessionResource
+ public connector::ConnectorResourceListener
{
private:
@@ -211,7 +211,7 @@
* this Session resource.
* @return pointer to a Connector Resource, can be NULL
*/
- virtual connector::ConnectorResource* getConnectorResource(void) {
+ virtual connector::ConnectorResource* getConnectorResource() {
return producerInfo;
}
@@ -221,9 +221,20 @@
* Retrives this object ProducerInfo pointer
* @return ProducerInfo pointer
*/
- virtual connector::ProducerInfo* getProducerInfo(void){
+ virtual connector::ProducerInfo* getProducerInfo(){
return producerInfo;
}
+
+ protected: // ConnectorResourceListener
+
+ /**
+ * When a Connector Resouce is closed it will notify any registered
+ * Listeners of its close so that they can take the appropriate
+ * action.
+ * @param resource - The ConnectorResource that was closed.
+ */
+ virtual void onConnectorResourceClosed(
+ const connector::ConnectorResource* resource ) throw ( cms::CMSException );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sat Mar 17 16:54:01 2007
@@ -227,18 +227,20 @@
"ActiveMQSession::createConsumer - Session Already Closed" );
}
- // Create the consumer instance.
- ActiveMQConsumer* consumer = new ActiveMQConsumer(
+ ConsumerInfo* consumerInfo =
connection->getConnectionData()->getConnector()->
createConsumer( destination,
sessionInfo,
selector,
- noLocal ), this );
+ noLocal );
- // Add the consumer to the map of closeable session resources.
- synchronized( &closableSessionResources ) {
- closableSessionResources.add( consumer );
- }
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( consumerInfo ) );
+
+ // Create the consumer instance.
+ ActiveMQConsumer* consumer = new ActiveMQConsumer(
+ consumerInfo, this );
// Register this consumer as a listener of messages from the
// connection.
@@ -250,7 +252,7 @@
connection->getConnectionData()->getConnector()->startConsumer(
consumer->getConsumerInfo() );
} catch( ActiveMQException& ex ) {
- this->onDestroySessionResource( consumer );
+ delete consumer;
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
@@ -278,15 +280,18 @@
"ActiveMQSession::createDurableConsumer - Session Already Closed" );
}
- // Create the consumer instance.
- ActiveMQConsumer* consumer = new ActiveMQConsumer(
+ ConsumerInfo* consumerInfo =
connection->getConnectionData()->getConnector()->
- createDurableConsumer( destination, sessionInfo, name, selector, noLocal ), this );
+ createDurableConsumer(
+ destination, sessionInfo, name, selector, noLocal );
- // Add the consumer to the map of closeable session resources.
- synchronized( &closableSessionResources ) {
- closableSessionResources.add( consumer );
- }
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( consumerInfo ) );
+
+ // Create the consumer instance.
+ ActiveMQConsumer* consumer = new ActiveMQConsumer(
+ consumerInfo, this );
// Register the consumer as a listener of messages from the
// connection.
@@ -298,7 +303,7 @@
connection->getConnectionData()->getConnector()->startConsumer(
consumer->getConsumerInfo() );
} catch( ActiveMQException& ex ) {
- this->onDestroySessionResource( consumer );
+ delete consumer;
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
@@ -323,15 +328,17 @@
"ActiveMQSession::createProducer - Session Already Closed" );
}
- // Create the producer instance.
- ActiveMQProducer* producer = new ActiveMQProducer(
+ ProducerInfo* producerInfo =
connection->getConnectionData()->getConnector()->
- createProducer( destination, sessionInfo ), this );
+ createProducer( destination, sessionInfo );
- // Add the producer to the map of closeable session resources.
- synchronized( &closableSessionResources ) {
- closableSessionResources.add( producer );
- }
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( producerInfo ) );
+
+ // Create the producer instance.
+ ActiveMQProducer* producer = new ActiveMQProducer(
+ producerInfo, this );
return producer;
}
@@ -352,8 +359,14 @@
"ActiveMQSession::createQueue - Session Already Closed" );
}
- return connection->getConnectionData()->
+ cms::Queue* queue = connection->getConnectionData()->
getConnector()->createQueue( queueName, sessionInfo );
+
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( queue ) );
+
+ return queue;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -372,8 +385,14 @@
"ActiveMQSession::createTopic - Session Already Closed");
}
- return connection->getConnectionData()->
+ cms::Topic* topic = connection->getConnectionData()->
getConnector()->createTopic( topicName, sessionInfo );
+
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( topic ) );
+
+ return topic;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -398,15 +417,9 @@
connection->getConnectionData()->
getConnector()->createTemporaryQueue( sessionInfo );
- // Check if this object is closeable, if so we add it to our map
- // of closeable resources so that it gets cleaned up.
- if( dynamic_cast<cms::Closeable*>( queue ) != NULL ) {
- // Add the consumer to the map of closeable session resources.
- synchronized( &closableSessionResources ) {
- closableSessionResources.add(
- dynamic_cast<cms::Closeable*>( queue ) );
- }
- }
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( queue ) );
return queue;
}
@@ -433,15 +446,9 @@
connection->getConnectionData()->
getConnector()->createTemporaryTopic( sessionInfo );
- // Check if this object is closeable, if so we add it to our map
- // of closeable resources so that it gets cleaned up.
- if( dynamic_cast<cms::Closeable*>( topic ) != NULL ) {
- // Add the consumer to the map of closeable session resources.
- synchronized( &closableSessionResources ) {
- closableSessionResources.add(
- dynamic_cast<cms::Closeable*>( topic ) );
- }
- }
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( topic ) );
return topic;
}
@@ -462,8 +469,14 @@
"ActiveMQSession::createMessage - Session Already Closed" );
}
- return connection->getConnectionData()->
+ cms::Message* message = connection->getConnectionData()->
getConnector()->createMessage( sessionInfo, transaction );
+
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( message ) );
+
+ return message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -482,8 +495,14 @@
"ActiveMQSession::createBytesMessage - Session Already Closed" );
}
- return connection->getConnectionData()->
+ cms::BytesMessage* message = connection->getConnectionData()->
getConnector()->createBytesMessage( sessionInfo, transaction );
+
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( message ) );
+
+ return message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -520,8 +539,14 @@
"ActiveMQSession::createTextMessage - Session Already Closed" );
}
- return connection->getConnectionData()->
+ cms::TextMessage* message = connection->getConnectionData()->
getConnector()->createTextMessage( sessionInfo, transaction );
+
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( message ) );
+
+ return message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -556,9 +581,14 @@
"ActiveMQSession::createMapMessage - Session Already Closed" );
}
- return connection->
- getConnectionData()->
- getConnector()->createMapMessage( sessionInfo, transaction );
+ cms::MapMessage* message = connection->getConnectionData()->
+ getConnector()->createMapMessage( sessionInfo, transaction );
+
+ // Add to Session Closeables and Monitor for close, if needed.
+ checkConnectorResource(
+ dynamic_cast<ConnectorResource*>( message ) );
+
+ return message;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -644,12 +674,11 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::onDestroySessionResource(
- ActiveMQSessionResource* resource )
- throw ( cms::CMSException )
-{
- try
- {
+void ActiveMQSession::onConnectorResourceClosed(
+ const ConnectorResource* resource ) throw ( cms::CMSException ) {
+
+ try{
+
if( closed )
{
throw InvalidStateException(
@@ -657,34 +686,33 @@
"ActiveMQSession::onProducerClose - Session Already Closed");
}
- ActiveMQConsumer* consumer =
- dynamic_cast< ActiveMQConsumer*>( resource );
+ const ConsumerInfo* consumer =
+ dynamic_cast<const ConsumerInfo*>( resource );
+
+ if( consumer != NULL ) {
- if( consumer != NULL )
- {
// Remove this Consumer from the Connection
connection->removeMessageListener(
- consumer->getConsumerInfo()->getConsumerId() );
+ consumer->getConsumerId() );
// Remove this consumer from the Transaction if we are
// transactional
- if( transaction != NULL )
- {
- transaction->removeFromTransaction( consumer );
+ if( transaction != NULL ) {
+ transaction->removeFromTransaction(
+ consumer->getConsumerId() );
}
}
// Remove the entry from the session resource map if it's there
- cms::Closeable* closeableResource = dynamic_cast<cms::Closeable*>(resource);
- if( closeableResource != NULL ){
+ const cms::Closeable* closeable =
+ dynamic_cast<const cms::Closeable*>( resource );
+
+ if( closeable != NULL ){
synchronized( &closableSessionResources ) {
- closableSessionResources.remove( closeableResource );
+ closableSessionResources.remove(
+ const_cast<cms::Closeable*>( closeable ) );
}
}
-
- // Free its resources.
- connection->getConnectionData()->
- getConnector()->destroyResource( resource->getConnectorResource() );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -838,4 +866,21 @@
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::checkConnectorResource(
+ connector::ConnectorResource* resource ) {
+
+ if( resource == NULL ) {
+ return;
+ }
+
+ // Add the consumer to the map of closeable session resources.
+ synchronized( &closableSessionResources ) {
+ closableSessionResources.add( resource );
+ }
+
+ // Register as a Listener
+ resource->addListener( this );
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Sat Mar 17 16:54:01 2007
@@ -22,9 +22,9 @@
#include <activemq/concurrent/Runnable.h>
#include <activemq/concurrent/Mutex.h>
#include <activemq/connector/SessionInfo.h>
-#include <activemq/core/ActiveMQSessionResource.h>
#include <activemq/util/Set.h>
#include <activemq/util/Queue.h>
+#include <activemq/connector/ConnectorResourceListener.h>
#include <set>
namespace activemq{
@@ -39,7 +39,8 @@
class ActiveMQSession :
public cms::Session,
- public concurrent::Runnable
+ public concurrent::Runnable,
+ public connector::ConnectorResourceListener
{
private:
@@ -266,21 +267,21 @@
* @return transacted true - false.
*/
virtual bool isTransacted() const;
-
+
/**
- * Unsubscribes a durable subscription that has been created by a
+ * Unsubscribes a durable subscription that has been created by a
* client.
- *
- * This method deletes the state being maintained on behalf of the
- * subscriber by its provider. It is erroneous for a client to delete a
- * durable subscription while there is an active MessageConsumer or
- * Subscriber for the subscription, or while a consumed message is
- * part of a pending transaction or has not been acknowledged in the
+ *
+ * This method deletes the state being maintained on behalf of the
+ * subscriber by its provider. It is erroneous for a client to delete a
+ * durable subscription while there is an active MessageConsumer or
+ * Subscriber for the subscription, or while a consumed message is
+ * part of a pending transaction or has not been acknowledged in the
* session.
* @param name the name used to identify this subscription
* @throws CMSException
*/
- virtual void unsubscribe( const std::string& name )
+ virtual void unsubscribe( const std::string& name )
throw ( cms::CMSException );
public: // ActiveMQSession specific Methods
@@ -295,17 +296,6 @@
throw ( cms::CMSException );
/**
- * When a ActiveMQ core object is closed or destroyed it should call
- * back and let the session know that it is going away, this allows
- * the session to clean up any associated resources. This method
- * destroy's the data that is associated with a Producer object
- * @param The Producer that is being destoryed
- * @throw CMSException
- */
- virtual void onDestroySessionResource( ActiveMQSessionResource* resource )
- throw ( cms::CMSException );
-
- /**
* Called to acknowledge the receipt of a message.
* @param The consumer that received the message
* @param The Message to acknowledge.
@@ -333,6 +323,17 @@
return sessionInfo;
}
+ protected: // ConnectorResourceListener
+
+ /**
+ * When a Connector Resouce is closed it will notify any registered
+ * Listeners of its close so that they can take the appropriate
+ * action.
+ * @param resource - The ConnectorResource that was closed.
+ */
+ virtual void onConnectorResourceClosed(
+ const connector::ConnectorResource* resource ) throw ( cms::CMSException );
+
protected:
/**
@@ -361,6 +362,18 @@
* result of a rollback, or of the consumer being shutdown.
*/
virtual void purgeMessages() throw ( exceptions::ActiveMQException );
+
+ /**
+ * Given a ConnectorResource pointer, this method will add it to the map
+ * of closeable resources that this connection must close on shutdown
+ * and register itself as a ConnectorResourceListener so that it
+ * can be told when the resouce has been closed by someone else
+ * and remove it from its map of closeable resources.
+ * @param resource - ConnectorResouce to monitor, if NULL no action
+ * is taken and no exception is thrown.
+ */
+ virtual void checkConnectorResource(
+ connector::ConnectorResource* resource );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp Sat Mar 17 16:54:01 2007
@@ -105,8 +105,8 @@
if( transactionInfo != NULL )
{
// Dispose of the ProducerInfo
- connection->getConnectionData()->
- getConnector()->destroyResource( transactionInfo );
+ transactionInfo->close();
+ delete transactionInfo;
}
synchronized( &rollbackLock )
@@ -176,6 +176,32 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::removeFromTransaction( long long consumerId ) {
+
+ try {
+
+ // Delete all the messages, then remove the consumer's entry from
+ // the Rollback Map.
+ synchronized( &rollbackLock )
+ {
+ RollbackMap::iterator iter = rollbackMap.begin();
+
+ for( ; iter == rollbackMap.end(); ++iter ) {
+
+ long long id = iter->first->getConsumerInfo()->getConsumerId();
+
+ if( id == consumerId ) {
+ removeFromTransaction( iter->first );
+ return;
+ }
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQTransaction::commit(void) throw ( exceptions::ActiveMQException )
{
try
@@ -220,9 +246,9 @@
connection->getConnectionData()->getConnector()->
rollback( transactionInfo, session->getSessionInfo() );
- // Dispose of the ProducerInfo
- connection->getConnectionData()->
- getConnector()->destroyResource( transactionInfo );
+ // Dispose of the TransactionInfo
+ transactionInfo->close();
+ delete transactionInfo;
// Start a new Transaction
transactionInfo = connection->getConnectionData()->
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h Sat Mar 17 16:54:01 2007
@@ -30,7 +30,6 @@
#include <activemq/exceptions/InvalidStateException.h>
#include <activemq/exceptions/IllegalArgumentException.h>
#include <activemq/util/Properties.h>
-#include <activemq/core/ActiveMQSessionResource.h>
namespace activemq{
namespace core{
@@ -57,8 +56,7 @@
* rolled back more than this many time, the message is dropped.
*/
class ActiveMQTransaction : public concurrent::TaskListener,
- public connector::TransactionInfo,
- public ActiveMQSessionResource
+ public connector::TransactionInfo
{
private:
@@ -110,7 +108,7 @@
ActiveMQSession* session,
const util::Properties& properties );
- virtual ~ActiveMQTransaction(void);
+ virtual ~ActiveMQTransaction();
/**
* Adds the Message as a part of the Transaction for the specified
@@ -124,22 +122,30 @@
/**
* Removes the ActiveMQConsumer and all of its transacted
* messages from the Transaction, this is usually only done when
- * a ActiveMQConsumer is destroyed.
+ * an ActiveMQConsumer is destroyed.
* @param listener - consumer who is to be removed.
*/
virtual void removeFromTransaction( ActiveMQConsumer* listener );
/**
+ * Removes the ActiveMQConsumer and all of its transacted
+ * messages from the Transaction, this is usually only done when
+ * an ActiveMQConsumer is destroyed.
+ * @param listener - consumer who is to be removed.
+ */
+ virtual void removeFromTransaction( long long consumerId );
+
+ /**
* Commit the current Transaction
* @throw CMSException
*/
- virtual void commit(void) throw ( exceptions::ActiveMQException );
+ virtual void commit() throw ( exceptions::ActiveMQException );
/**
* Rollback the current Transaction
* @throw CMSException
*/
- virtual void rollback(void) throw ( exceptions::ActiveMQException );
+ virtual void rollback() throw ( exceptions::ActiveMQException );
/**
* Get the Transaction Information object for the current
@@ -156,7 +162,7 @@
* Gets the Transction Id
* @return integral value of Id
*/
- virtual long long getTransactionId(void) const {
+ virtual long long getTransactionId() const {
return transactionInfo->getTransactionId();
}
@@ -225,7 +231,7 @@
* well.
* @throw ActiveMQException
*/
- virtual void clearTransaction(void);
+ virtual void clearTransaction();
private:
@@ -272,7 +278,7 @@
}
// Dispatches the Messages to the Consumer.
- virtual void run(void);
+ virtual void run();
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/TemporaryQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/TemporaryQueue.h?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/TemporaryQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/TemporaryQueue.h Sat Mar 17 16:54:01 2007
@@ -36,9 +36,15 @@
* Gets the name of this queue.
* @return The queue name.
*/
- virtual std::string getQueueName() const
+ virtual std::string getQueueName() const
throw( CMSException ) = 0;
-
+
+ /**
+ * Destroy's the Temp Destination at the Broker
+ * @throws CMSException
+ */
+ virtual void destroy() throw ( CMSException ) = 0;
+
};
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/TemporaryTopic.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/TemporaryTopic.h?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/TemporaryTopic.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/TemporaryTopic.h Sat Mar 17 16:54:01 2007
@@ -36,9 +36,15 @@
* Gets the name of this topic.
* @return The topic name.
*/
- virtual std::string getTopicName()
+ virtual std::string getTopicName()
const throw( CMSException ) = 0;
-
+
+ /**
+ * Destroy's the Temp Destination at the Broker
+ * @throws CMSException
+ */
+ virtual void destroy() throw ( CMSException ) = 0;
+
};
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp Sat Mar 17 16:54:01 2007
@@ -19,7 +19,7 @@
#include <integration/IntegrationCommon.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireTempDestinationTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireTempDestinationTest );
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Mutex.h>
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h Sat Mar 17 16:54:01 2007
@@ -127,10 +127,10 @@
CPPUNIT_ASSERT( info4->getAckMode() == cms::Session::SESSION_TRANSACTED );
CPPUNIT_ASSERT( info4->getConnectionId() == connectionId );
- connector->destroyResource( info1 );
- connector->destroyResource( info2 );
- connector->destroyResource( info3 );
- connector->destroyResource( info4 );
+ delete info1;
+ delete info2;
+ delete info3;
+ delete info4;
// Delete the connector here - this assures the propery order
// of destruction.
@@ -182,15 +182,15 @@
CPPUNIT_ASSERT( cinfo4->getDestination()->toProviderString() == dest4.toProviderString() );
CPPUNIT_ASSERT( cinfo4->getMessageSelector() == sel4 );
- connector->destroyResource( cinfo1 );
- connector->destroyResource( cinfo2 );
- connector->destroyResource( cinfo3 );
- connector->destroyResource( cinfo4 );
-
- connector->destroyResource( info1 );
- connector->destroyResource( info2 );
- connector->destroyResource( info3 );
- connector->destroyResource( info4 );
+ delete cinfo1;
+ delete cinfo2;
+ delete cinfo3;
+ delete cinfo4;
+
+ delete info1;
+ delete info2;
+ delete info3;
+ delete info4;
// Delete the connector here - this assures the propery order
// of destruction.
@@ -234,15 +234,15 @@
CPPUNIT_ASSERT( pinfo4->getSessionInfo() == info4 );
CPPUNIT_ASSERT( pinfo4->getDestination()->toProviderString() == dest4.toProviderString() );
- connector->destroyResource( pinfo1 );
- connector->destroyResource( pinfo2 );
- connector->destroyResource( pinfo3 );
- connector->destroyResource( pinfo4 );
-
- connector->destroyResource( info1 );
- connector->destroyResource( info2 );
- connector->destroyResource( info3 );
- connector->destroyResource( info4 );
+ delete pinfo1;
+ delete pinfo2;
+ delete pinfo3;
+ delete pinfo4;
+
+ delete info1;
+ delete info2;
+ delete info3;
+ delete info4;
// Delete the connector here - this assures the propery order
// of destruction.
@@ -316,15 +316,15 @@
listener.consumers[ix] == cinfo4 );
}
- connector->destroyResource( cinfo1 );
- connector->destroyResource( cinfo2 );
- connector->destroyResource( cinfo3 );
- connector->destroyResource( cinfo4 );
-
- connector->destroyResource( info1 );
- connector->destroyResource( info2 );
- connector->destroyResource( info3 );
- connector->destroyResource( info4 );
+ delete cinfo1;
+ delete cinfo2;
+ delete cinfo3;
+ delete cinfo4;
+
+ delete info1;
+ delete info2;
+ delete info3;
+ delete info4;
// Delete the connector here - this assures the propery order
// of destruction.
@@ -358,16 +358,16 @@
cmdListener.cmd = NULL;
- connector->destroyResource( cinfo1 );
+ delete cinfo1;
CPPUNIT_ASSERT( cmdListener.cmd == NULL );
cmdListener.cmd = NULL;
- connector->destroyResource( cinfo2 );
+ delete cinfo2;
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
- connector->destroyResource( info1 );
- connector->destroyResource( info2 );
+ delete info1;
+ delete info2;
delete connector;
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h?view=diff&rev=519474&r1=519473&r2=519474
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h Sat Mar 17 16:54:01 2007
@@ -23,6 +23,7 @@
#include <activemq/connector/stomp/StompSessionManager.h>
#include <activemq/connector/stomp/StompResponseBuilder.h>
+#include <activemq/connector/stomp/StompConnector.h>
#include <activemq/connector/stomp/StompTopic.h>
#include <activemq/connector/stomp/StompQueue.h>
#include <activemq/transport/DummyTransport.h>
@@ -109,9 +110,14 @@
void testSessions()
{
std::string connectionId = "testConnectionId";
- StompResponseBuilder responseBuilder("testSessionId");
+ StompResponseBuilder responseBuilder("testConnectionId");
transport::DummyTransport transport( &responseBuilder );
- StompSessionManager manager( connectionId, &transport );
+ util::SimpleProperties properties;
+
+ // Using a pointer for the connector so we ensure the proper destruction
+ // order of objects - connector before the transport.
+ StompConnector* connector = new StompConnector( &transport, properties );
+ StompSessionManager manager( connectionId, NULL, &transport );
SessionInfo* info1 = manager.createSession( cms::Session::AUTO_ACKNOWLEDGE );
CPPUNIT_ASSERT( info1->getAckMode() == cms::Session::AUTO_ACKNOWLEDGE );
@@ -138,9 +144,14 @@
void testConsumers()
{
std::string connectionId = "testConnectionId";
- StompResponseBuilder responseBuilder("testSessionId");
+ StompResponseBuilder responseBuilder("testConnectionId");
transport::DummyTransport transport( &responseBuilder );
- StompSessionManager manager( connectionId, &transport );
+ util::SimpleProperties properties;
+
+ // Using a pointer for the connector so we ensure the proper destruction
+ // order of objects - connector before the transport.
+ StompConnector* connector = new StompConnector( &transport, properties );
+ StompSessionManager manager( connectionId, NULL, &transport );
SessionInfo* info1 = manager.createSession( cms::Session::AUTO_ACKNOWLEDGE );
std::string sel1 = "";
@@ -188,9 +199,14 @@
void testCommand()
{
std::string connectionId = "testConnectionId";
- StompResponseBuilder responseBuilder("testSessionId");
+ StompResponseBuilder responseBuilder("testConnectionId");
transport::DummyTransport transport( &responseBuilder );
- StompSessionManager manager( connectionId, &transport );
+ util::SimpleProperties properties;
+
+ // Using a pointer for the connector so we ensure the proper destruction
+ // order of objects - connector before the transport.
+ StompConnector* connector = new StompConnector( &transport, properties );
+ StompSessionManager manager( connectionId, NULL, &transport );
StompTopic dest1( "dummy.topic" );
StompTopic dest2( "dummy.topic2" );
@@ -248,12 +264,15 @@
void testSendingCommands(){
-
-
std::string connectionId = "testConnectionId";
- StompResponseBuilder responseBuilder("testSessionId");
+ StompResponseBuilder responseBuilder("testConnectionId");
transport::DummyTransport transport( &responseBuilder );
- StompSessionManager manager( connectionId, &transport );
+ util::SimpleProperties properties;
+
+ // Using a pointer for the connector so we ensure the proper destruction
+ // order of objects - connector before the transport.
+ StompConnector* connector = new StompConnector( &transport, properties );
+ StompSessionManager manager( connectionId, NULL, &transport );
StompTopic dest1( "dummy.topic.1" );
@@ -290,9 +309,14 @@
void testSubscribeOptions(){
std::string connectionId = "testConnectionId";
- StompResponseBuilder responseBuilder("testSessionId");
+ StompResponseBuilder responseBuilder("testConnectionId");
transport::DummyTransport transport( &responseBuilder );
- StompSessionManager manager( connectionId, &transport );
+ util::SimpleProperties properties;
+
+ // Using a pointer for the connector so we ensure the proper destruction
+ // order of objects - connector before the transport.
+ StompConnector* connector = new StompConnector( &transport, properties );
+ StompSessionManager manager( connectionId, NULL, &transport );
MyProperty retroactive =
std::make_pair( "activemq.retroactive", "true" );