You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/05/02 01:39:55 UTC
svn commit: r652713 - in
/activemq/activemq-cpp/trunk/src/test-integration/integration/connector:
openwire/OpenwireExpirationTest.cpp openwire/OpenwireExpirationTest.h
stomp/ExpirationTest.cpp stomp/ExpirationTest.h
Author: tabish
Date: Thu May 1 16:39:54 2008
New Revision: 652713
URL: http://svn.apache.org/viewvc?rev=652713&view=rev
Log:
Fixing some issues with the tests, openwire wasn't actually talking openwire, etc.
Modified:
activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp
activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h
activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.cpp
activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.h
Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp?rev=652713&r1=652712&r2=652713&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp Thu May 1 16:39:54 2008
@@ -72,202 +72,246 @@
using namespace decaf::util;
using namespace decaf::util::concurrent;
+////////////////////////////////////////////////////////////////////////////////
+namespace integration {
+namespace connector {
+namespace openwire{
+
+ class Producer : public decaf::lang::Runnable {
+ private:
+
+ cms::Connection* connection;
+ cms::Session* session;
+ cms::Destination* destination;
+ cms::MessageProducer* producer;
+ int numMessages;
+ long long timeToLive;
+ bool disableTimeStamps;
+ std::string topic;
+
+ public:
+
+ Producer( std::string topic, int numMessages,
+ long long timeToLive ){
+
+ this->connection = NULL;
+ this->session = NULL;
+ this->destination = NULL;
+ this->producer = NULL;
+ this->numMessages = numMessages;
+ this->timeToLive = timeToLive;
+ this->disableTimeStamps = false;
+ this->topic = topic;
+ }
+
+ virtual ~Producer(){
+ cleanup();
+ }
+
+ virtual bool getDisableTimeStamps() const {
+ return this->disableTimeStamps;
+ }
+
+ virtual void setDisableTimeStamps( bool value ){
+ this->disableTimeStamps = value;
+ }
+
+ virtual void run() {
+ try {
+ // Create a ConnectionFactory
+ ActiveMQConnectionFactory* connectionFactory =
+ new ActiveMQConnectionFactory(
+ IntegrationCommon::getInstance().getOpenwireURL() );
+
+ // Create a Connection
+ connection = connectionFactory->createConnection();
+ delete connectionFactory;
+ connection->start();
+
+ string sss=connection->getClientID();
+ cout << sss << endl;
+
+ session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+ destination = session->createQueue( topic );
+
+ producer = session->createProducer( destination );
+ producer->setDeliveryMode( DeliveryMode::PERSISTENT );
+ producer->setDisableMessageTimeStamp( disableTimeStamps );
+
+ //unsigned long ttt=getcurt();
+ producer->setTimeToLive( 1 );
+
+ // Create the Thread Id String
+ string threadIdStr = Integer::toString( Thread::getId() );
+
+ // Create a messages
+ string text = (string)"Hello world! from thread " + threadIdStr;
+
+ for( int ix=0; ix<numMessages; ++ix ){
+ TextMessage* message = session->createTextMessage( text );
+ producer->send( message );
+ delete message;
+ }
+
+ } catch ( CMSException& e ) {
+ e.printStackTrace();
+ }
+ }
+
+ private:
+
+ void cleanup(){
+
+ // Destroy resources.
+ try{
+ if( destination != NULL ) delete destination;
+ }catch ( CMSException& e ) {}
+ destination = NULL;
+
+ try{
+ if( producer != NULL ) delete producer;
+ }catch ( CMSException& e ) {}
+ producer = NULL;
+
+ // Close open resources.
+ try{
+ if( session != NULL ) session->close();
+ if( connection != NULL ) connection->close();
+ }catch ( CMSException& e ) {}
+
+ try{
+ if( session != NULL ) delete session;
+ }catch ( CMSException& e ) {}
+ session = NULL;
+
+ try{
+ if( connection != NULL ) delete connection;
+ }catch ( CMSException& e ) {}
+ connection = NULL;
+ }
+
+ };
+
+ class Consumer : public cms::MessageListener, public decaf::lang::Runnable {
+ private:
+
+ cms::Connection* connection;
+ cms::Session* session;
+ cms::Destination* destination;
+ cms::MessageConsumer* consumer;
+ long waitMillis;
+ int numReceived;
+ std::string topic;
+
+ public:
+
+ Consumer( std::string topic, long waitMillis ){
+ connection = NULL;
+ session = NULL;
+ destination = NULL;
+ consumer = NULL;
+ this->waitMillis = waitMillis;
+ numReceived = 0;
+ this->topic = topic;
+ }
+
+ virtual ~Consumer(){
+ cleanup();
+ }
+
+ virtual int getNumReceived() const{
+ return numReceived;
+ }
+
+ virtual void run(){
+
+ try {
+
+ string user,passwd,sID;
+ user="default";
+ passwd="";
+ sID="lsgID";
+
+ // Create a Connection
+ connection = ActiveMQConnectionFactory::createConnection(
+ IntegrationCommon::getInstance().getOpenwireURL(), user, passwd, sID );
+
+ connection->start();
+
+ // Create a Session
+ session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+
+ // Create the destination (Topic or Queue)
+ string t = topic + "?consumer.retroactive=true";
+
+ destination = session->createQueue( t );
+
+ consumer = session->createConsumer( destination );
+
+ consumer->setMessageListener( this );
+
+ // Sleep while asynchronous messages come in.
+ Thread::sleep( waitMillis );
+
+ } catch (CMSException& e) {
+ e.printStackTrace();
+ }
+ }
+
+ virtual void onMessage( const cms::Message* message ){
+
+ try{
+ const TextMessage* textMessage =
+ dynamic_cast< const TextMessage* >( message );
+ textMessage->getText();
+ numReceived++;
+ } catch (CMSException& e) {
+ e.printStackTrace();
+ }
+ }
+
+ private:
+
+ void cleanup(){
+
+ // Destroy resources.
+ try{
+ if( destination != NULL ) delete destination;
+ }catch (CMSException& e) {}
+ destination = NULL;
+
+ try{
+ if( consumer != NULL ) delete consumer;
+ }catch (CMSException& e) {}
+ consumer = NULL;
+
+ // Close open resources.
+ try{
+ if( session != NULL ) session->close();
+ if( connection != NULL ) connection->close();
+ }catch (CMSException& e) {}
+
+ try{
+ if( session != NULL ) delete session;
+ }catch (CMSException& e) {}
+ session = NULL;
+
+ try{
+ if( connection != NULL ) delete connection;
+ }catch (CMSException& e) {}
+ connection = NULL;
+ }
+ };
+
+}}}
+
using namespace std;
using namespace integration;
using namespace integration::connector::openwire;
-OpenwireExpirationTest::Producer::Producer( string topic, int numMessages, long long timeToLive ){
- connection = NULL;
- session = NULL;
- destination = NULL;
- producer = NULL;
- this->numMessages = numMessages;
- this->timeToLive = timeToLive;
- this->disableTimeStamps = false;
- this->topic = topic;
-}
-
-OpenwireExpirationTest::Producer::~Producer(){
- cleanup();
-}
-
-bool OpenwireExpirationTest::Producer::getDisableTimeStamps() const {
- return disableTimeStamps;
-}
-
-void OpenwireExpirationTest::Producer::setDisableTimeStamps( bool value ) {
- this->disableTimeStamps = value;
-}
-
-void OpenwireExpirationTest::Producer::run() {
- try {
- // Create a ConnectionFactory
- ActiveMQConnectionFactory* connectionFactory =
- new ActiveMQConnectionFactory(
- IntegrationCommon::getInstance().getStompURL() );
-
- // Create a Connection
- connection = connectionFactory->createConnection();
- delete connectionFactory;
- connection->start();
-
- string sss=connection->getClientID();
- cout << sss << endl;
-
- session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
- destination = session->createTopic( topic );
-
- producer = session->createProducer( destination );
- producer->setDeliveryMode( DeliveryMode::PERSISTENT );
- producer->setDisableMessageTimeStamp( disableTimeStamps );
-
- //unsigned long ttt=getcurt();
- producer->setTimeToLive( 1);
-
- // Create the Thread Id String
- string threadIdStr = Integer::toString( Thread::getId() );
-
- // Create a messages
- string text = (string)"Hello world! from thread " + threadIdStr;
-
- for( int ix=0; ix<numMessages; ++ix ){
- TextMessage* message = session->createTextMessage( text );
- producer->send( message );
- delete message;
- }
-
- }catch ( CMSException& e ) {
- e.printStackTrace();
- }
- }
-
-void OpenwireExpirationTest::Producer::cleanup(){
-
- // Destroy resources.
- try{
- if( destination != NULL ) delete destination;
- }catch ( CMSException& e ) {}
- destination = NULL;
-
- try{
- if( producer != NULL ) delete producer;
- }catch ( CMSException& e ) {}
- producer = NULL;
-
- // Close open resources.
- try{
- if( session != NULL ) session->close();
- if( connection != NULL ) connection->close();
- }catch ( CMSException& e ) {}
-
- try{
- if( session != NULL ) delete session;
- }catch ( CMSException& e ) {}
- session = NULL;
-
- try{
- if( connection != NULL ) delete connection;
- }catch ( CMSException& e ) {}
- connection = NULL;
-}
-
-OpenwireExpirationTest::Consumer::Consumer( string topic, long waitMillis ){
- connection = NULL;
- session = NULL;
- destination = NULL;
- consumer = NULL;
- this->waitMillis = waitMillis;
- numReceived = 0;
- this->topic = topic;
-}
-
-OpenwireExpirationTest::Consumer::~Consumer(){
- cleanup();
-}
-
-int OpenwireExpirationTest::Consumer::getNumReceived() const{
- return numReceived;
-}
-
-void OpenwireExpirationTest::Consumer::run() {
-
- try {
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireExpirationTest::testExpired() {
- string user,passwd,sID;
- user="default";
- passwd="";
- sID="lsgID";
-
- // Create a Connection
- connection = ActiveMQConnectionFactory::createConnection(
- IntegrationCommon::getInstance().getStompURL(), user, passwd, sID );
- connection->start();
-
- // Create a Session
- session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
-
- // Create the destination (Topic or Queue)
- string t = topic + "?consumer.retroactive=true";
-
- destination = session->createTopic( t );
-
- consumer = session->createConsumer( destination );
-
- consumer->setMessageListener( this );
-
- // Sleep while asynchronous messages come in.
- Thread::sleep( waitMillis );
-
- } catch (CMSException& e) {
- e.printStackTrace();
- }
-}
-
-void OpenwireExpirationTest::Consumer::onMessage( const Message* message ){
-
- try
- {
- const TextMessage* textMessage =
- dynamic_cast< const TextMessage* >( message );
- string text = textMessage->getText();
- numReceived++;
- } catch (CMSException& e) {
- e.printStackTrace();
- }
-}
-
-void OpenwireExpirationTest::Consumer::cleanup(){
-
- // Destroy resources.
- try{
- if( destination != NULL ) delete destination;
- }catch (CMSException& e) {}
- destination = NULL;
-
- try{
- if( consumer != NULL ) delete consumer;
- }catch (CMSException& e) {}
- consumer = NULL;
-
- // Close open resources.
- try{
- if( session != NULL ) session->close();
- if( connection != NULL ) connection->close();
- }catch (CMSException& e) {}
-
- try{
- if( session != NULL ) delete session;
- }catch (CMSException& e) {}
- session = NULL;
-
- try{
- if( connection != NULL ) delete connection;
- }catch (CMSException& e) {}
- connection = NULL;
-}
-
-void OpenwireExpirationTest::testExpired()
-{
string topic = UUID::randomUUID().toString();
Producer producer( topic, 1, 1 );
Thread producerThread( &producer );
@@ -286,8 +330,9 @@
CPPUNIT_ASSERT_EQUAL( 0, consumer.getNumReceived() );
}
-void OpenwireExpirationTest::testNotExpired()
-{
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireExpirationTest::testNotExpired() {
+
string topic = UUID::randomUUID().toString();
Producer producer( topic, 2, 2000 );
producer.setDisableTimeStamps( true );
@@ -300,8 +345,5 @@
consumerThread.start();
consumerThread.join();
- Thread::sleep( 50 );
-
CPPUNIT_ASSERT_EQUAL( 2, consumer.getNumReceived() );
}
-
Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h?rev=652713&r1=652712&r2=652713&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h Thu May 1 16:39:54 2008
@@ -28,90 +28,28 @@
#include <cms/Session.h>
#include <cms/MessageProducer.h>
-#include <decaf/lang/Runnable.h>
-
namespace integration{
namespace connector{
namespace openwire{
- class OpenwireExpirationTest : public CppUnit::TestFixture
- {
+ class OpenwireExpirationTest : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE( OpenwireExpirationTest );
CPPUNIT_TEST( testExpired );
CPPUNIT_TEST( testNotExpired );
CPPUNIT_TEST_SUITE_END();
public:
-
+
static std::string messageTag;
-
+
public:
-
+
OpenwireExpirationTest(){}
virtual ~OpenwireExpirationTest(){}
-
+
virtual void testExpired();
virtual void testNotExpired();
-
- public:
-
- class Producer : public decaf::lang::Runnable {
- private:
-
- cms::Connection* connection;
- cms::Session* session;
- cms::Topic* destination;
- cms::MessageProducer* producer;
- int numMessages;
- long long timeToLive;
- bool disableTimeStamps;
- std::string topic;
-
- public:
-
- Producer( std::string topic, int numMessages, long long timeToLive );
-
- virtual ~Producer();
-
- virtual bool getDisableTimeStamps() const;
-
- virtual void setDisableTimeStamps( bool value );
-
- virtual void run();
-
- private:
-
- void cleanup();
- };
-
- class Consumer : public cms::MessageListener, public decaf::lang::Runnable {
-
- private:
-
- cms::Connection* connection;
- cms::Session* session;
- cms::Topic* destination;
- cms::MessageConsumer* consumer;
- long waitMillis;
- int numReceived;
- std::string topic;
-
- public:
-
- Consumer( std::string topic, long waitMillis );
-
- virtual ~Consumer();
-
- virtual int getNumReceived() const;
-
- virtual void run();
-
- virtual void onMessage( const cms::Message* message );
-
- private:
-
- void cleanup();
- };
+
};
}}}
Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.cpp?rev=652713&r1=652712&r2=652713&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.cpp Thu May 1 16:39:54 2008
@@ -39,6 +39,8 @@
#include <activemq/core/ActiveMQProducer.h>
#include <decaf/util/StringTokenizer.h>
#include <decaf/lang/Boolean.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
#include <cms/Connection.h>
#include <cms/MessageConsumer.h>
@@ -74,202 +76,246 @@
using namespace decaf::lang;
using namespace decaf::util;
+////////////////////////////////////////////////////////////////////////////////
+namespace integration {
+namespace connector {
+namespace stomp{
+
+ class Producer : public decaf::lang::Runnable {
+ private:
+
+ cms::Connection* connection;
+ cms::Session* session;
+ cms::Destination* destination;
+ cms::MessageProducer* producer;
+ int numMessages;
+ long long timeToLive;
+ bool disableTimeStamps;
+ std::string topic;
+
+ public:
+
+ Producer( std::string topic, int numMessages,
+ long long timeToLive ){
+
+ this->connection = NULL;
+ this->session = NULL;
+ this->destination = NULL;
+ this->producer = NULL;
+ this->numMessages = numMessages;
+ this->timeToLive = timeToLive;
+ this->disableTimeStamps = false;
+ this->topic = topic;
+ }
+
+ virtual ~Producer(){
+ cleanup();
+ }
+
+ virtual bool getDisableTimeStamps() const {
+ return this->disableTimeStamps;
+ }
+
+ virtual void setDisableTimeStamps( bool value ){
+ this->disableTimeStamps = value;
+ }
+
+ virtual void run() {
+ try {
+ // Create a ConnectionFactory
+ ActiveMQConnectionFactory* connectionFactory =
+ new ActiveMQConnectionFactory(
+ IntegrationCommon::getInstance().getStompURL() );
+
+ // Create a Connection
+ connection = connectionFactory->createConnection();
+ delete connectionFactory;
+ connection->start();
+
+ string sss=connection->getClientID();
+ cout << sss << endl;
+
+ session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+ destination = session->createQueue( topic );
+
+ producer = session->createProducer( destination );
+ producer->setDeliveryMode( DeliveryMode::PERSISTENT );
+ producer->setDisableMessageTimeStamp( disableTimeStamps );
+
+ //unsigned long ttt=getcurt();
+ producer->setTimeToLive( 1 );
+
+ // Create the Thread Id String
+ string threadIdStr = Integer::toString( Thread::getId() );
+
+ // Create a messages
+ string text = (string)"Hello world! from thread " + threadIdStr;
+
+ for( int ix=0; ix<numMessages; ++ix ){
+ TextMessage* message = session->createTextMessage( text );
+ producer->send( message );
+ delete message;
+ }
+
+ } catch ( CMSException& e ) {
+ e.printStackTrace();
+ }
+ }
+
+ private:
+
+ void cleanup(){
+
+ // Destroy resources.
+ try{
+ if( destination != NULL ) delete destination;
+ }catch ( CMSException& e ) {}
+ destination = NULL;
+
+ try{
+ if( producer != NULL ) delete producer;
+ }catch ( CMSException& e ) {}
+ producer = NULL;
+
+ // Close open resources.
+ try{
+ if( session != NULL ) session->close();
+ if( connection != NULL ) connection->close();
+ }catch ( CMSException& e ) {}
+
+ try{
+ if( session != NULL ) delete session;
+ }catch ( CMSException& e ) {}
+ session = NULL;
+
+ try{
+ if( connection != NULL ) delete connection;
+ }catch ( CMSException& e ) {}
+ connection = NULL;
+ }
+
+ };
+
+ class Consumer : public cms::MessageListener, public decaf::lang::Runnable {
+ private:
+
+ cms::Connection* connection;
+ cms::Session* session;
+ cms::Destination* destination;
+ cms::MessageConsumer* consumer;
+ long waitMillis;
+ int numReceived;
+ std::string topic;
+
+ public:
+
+ Consumer( std::string topic, long waitMillis ){
+ connection = NULL;
+ session = NULL;
+ destination = NULL;
+ consumer = NULL;
+ this->waitMillis = waitMillis;
+ numReceived = 0;
+ this->topic = topic;
+ }
+
+ virtual ~Consumer(){
+ cleanup();
+ }
+
+ virtual int getNumReceived() const{
+ return numReceived;
+ }
+
+ virtual void run(){
+
+ try {
+
+ string user,passwd,sID;
+ user="default";
+ passwd="";
+ sID="lsgID";
+
+ // Create a Connection
+ connection = ActiveMQConnectionFactory::createConnection(
+ IntegrationCommon::getInstance().getStompURL(), user, passwd, sID );
+
+ connection->start();
+
+ // Create a Session
+ session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+
+ // Create the destination (Topic or Queue)
+ string t = topic + "?consumer.retroactive=true";
+
+ destination = session->createQueue( t );
+
+ consumer = session->createConsumer( destination );
+
+ consumer->setMessageListener( this );
+
+ // Sleep while asynchronous messages come in.
+ Thread::sleep( waitMillis );
+
+ } catch (CMSException& e) {
+ e.printStackTrace();
+ }
+ }
+
+ virtual void onMessage( const cms::Message* message ){
+
+ try{
+ const TextMessage* textMessage =
+ dynamic_cast< const TextMessage* >( message );
+ textMessage->getText();
+ numReceived++;
+ } catch (CMSException& e) {
+ e.printStackTrace();
+ }
+ }
+
+ private:
+
+ void cleanup(){
+
+ // Destroy resources.
+ try{
+ if( destination != NULL ) delete destination;
+ }catch (CMSException& e) {}
+ destination = NULL;
+
+ try{
+ if( consumer != NULL ) delete consumer;
+ }catch (CMSException& e) {}
+ consumer = NULL;
+
+ // Close open resources.
+ try{
+ if( session != NULL ) session->close();
+ if( connection != NULL ) connection->close();
+ }catch (CMSException& e) {}
+
+ try{
+ if( session != NULL ) delete session;
+ }catch (CMSException& e) {}
+ session = NULL;
+
+ try{
+ if( connection != NULL ) delete connection;
+ }catch (CMSException& e) {}
+ connection = NULL;
+ }
+ };
+
+}}}
+
using namespace std;
using namespace integration;
using namespace integration::connector::stomp;
-ExpirationTest::Producer::Producer( string topic, int numMessages, long long timeToLive ){
- connection = NULL;
- session = NULL;
- destination = NULL;
- producer = NULL;
- this->numMessages = numMessages;
- this->timeToLive = timeToLive;
- this->disableTimeStamps = false;
- this->topic = topic;
-}
-
-ExpirationTest::Producer::~Producer(){
- cleanup();
-}
-
-bool ExpirationTest::Producer::getDisableTimeStamps() const {
- return disableTimeStamps;
-}
-
-void ExpirationTest::Producer::setDisableTimeStamps( bool value ) {
- this->disableTimeStamps = value;
-}
-
-void ExpirationTest::Producer::run() {
- try {
- // Create a ConnectionFactory
- ActiveMQConnectionFactory* connectionFactory =
- new ActiveMQConnectionFactory( IntegrationCommon::getInstance().getStompURL() );
-
- // Create a Connection
- connection = connectionFactory->createConnection();
- delete connectionFactory;
- connection->start();
-
- string sss=connection->getClientID();
- cout << sss << endl;
-
- session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
- destination = session->createTopic( topic );
-
- producer = session->createProducer( destination );
- producer->setDeliveryMode( DeliveryMode::PERSISTENT );
- producer->setDisableMessageTimeStamp( disableTimeStamps );
-
- //unsigned long ttt=getcurt();
- producer->setTimeToLive( 1);
-
- // Create the Thread Id String
- string threadIdStr = Integer::toString( Thread::getId() );
-
- // Create a messages
- string text = (string)"Hello world! from thread " + threadIdStr;
-
- for( int ix=0; ix<numMessages; ++ix ){
- TextMessage* message = session->createTextMessage( text );
- producer->send( message );
- delete message;
- }
-
- }catch ( CMSException& e ) {
- e.printStackTrace();
- }
- }
-
-void ExpirationTest::Producer::cleanup(){
-
- // Destroy resources.
- try{
- if( destination != NULL ) delete destination;
- }catch ( CMSException& e ) {}
- destination = NULL;
-
- try{
- if( producer != NULL ) delete producer;
- }catch ( CMSException& e ) {}
- producer = NULL;
-
- // Close open resources.
- try{
- if( session != NULL ) session->close();
- if( connection != NULL ) connection->close();
- }catch ( CMSException& e ) {}
-
- try{
- if( session != NULL ) delete session;
- }catch ( CMSException& e ) {}
- session = NULL;
-
- try{
- if( connection != NULL ) delete connection;
- }catch ( CMSException& e ) {}
- connection = NULL;
-}
-
-ExpirationTest::Consumer::Consumer( string topic, long waitMillis ){
- connection = NULL;
- session = NULL;
- destination = NULL;
- consumer = NULL;
- this->waitMillis = waitMillis;
- numReceived = 0;
- this->topic = topic;
-}
-
-ExpirationTest::Consumer::~Consumer(){
- cleanup();
-}
-
-int ExpirationTest::Consumer::getNumReceived() const{
- return numReceived;
-}
-
-void ExpirationTest::Consumer::run() {
-
- try {
-
- string user,passwd,sID;
- user="default";
- passwd="";
- sID="lsgID";
+////////////////////////////////////////////////////////////////////////////////
+void ExpirationTest::testExpired() {
- // Create a Connection
- connection = ActiveMQConnectionFactory::createConnection(
- IntegrationCommon::getInstance().getStompURL(), user, passwd, sID );
-
- connection->start();
-
- // Create a Session
- session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
-
- // Create the destination (Topic or Queue)
- string t = topic + "?consumer.retroactive=true";
-
- destination = session->createTopic( t );
-
- consumer = session->createConsumer( destination );
-
- consumer->setMessageListener( this );
-
- // Sleep while asynchronous messages come in.
- Thread::sleep( waitMillis );
-
- } catch (CMSException& e) {
- e.printStackTrace();
- }
-}
-
-void ExpirationTest::Consumer::onMessage( const Message* message ){
-
- try
- {
- const TextMessage* textMessage =
- dynamic_cast< const TextMessage* >( message );
- string text = textMessage->getText();
- numReceived++;
- } catch (CMSException& e) {
- e.printStackTrace();
- }
-}
-
-void ExpirationTest::Consumer::cleanup(){
-
- // Destroy resources.
- try{
- if( destination != NULL ) delete destination;
- }catch (CMSException& e) {}
- destination = NULL;
-
- try{
- if( consumer != NULL ) delete consumer;
- }catch (CMSException& e) {}
- consumer = NULL;
-
- // Close open resources.
- try{
- if( session != NULL ) session->close();
- if( connection != NULL ) connection->close();
- }catch (CMSException& e) {}
-
- try{
- if( session != NULL ) delete session;
- }catch (CMSException& e) {}
- session = NULL;
-
- try{
- if( connection != NULL ) delete connection;
- }catch (CMSException& e) {}
- connection = NULL;
-}
-
-void ExpirationTest::testExpired()
-{
string topic = UUID::randomUUID().toString();
Producer producer( topic, 1, 1 );
Thread producerThread( &producer );
@@ -288,10 +334,11 @@
CPPUNIT_ASSERT_EQUAL( 0, consumer.getNumReceived() );
}
-void ExpirationTest::testNotExpired()
-{
+////////////////////////////////////////////////////////////////////////////////
+void ExpirationTest::testNotExpired() {
+
string topic = UUID::randomUUID().toString();
- Producer producer( topic, 2, 2000 );
+ Producer producer( topic, 5, 3000 );
producer.setDisableTimeStamps( true );
Thread producerThread( &producer );
producerThread.start();
@@ -302,8 +349,5 @@
consumerThread.start();
consumerThread.join();
- Thread::sleep( 50 );
-
- CPPUNIT_ASSERT_EQUAL( 2, consumer.getNumReceived() );
+ CPPUNIT_ASSERT_EQUAL( 5, consumer.getNumReceived() );
}
-
Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.h?rev=652713&r1=652712&r2=652713&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/stomp/ExpirationTest.h Thu May 1 16:39:54 2008
@@ -28,13 +28,11 @@
#include <cms/Session.h>
#include <cms/MessageProducer.h>
-#include <decaf/lang/Runnable.h>
-
namespace integration{
namespace connector{
namespace stomp{
- class ExpirationTest : public CppUnit::TestFixture
+ class ExpirationTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE( ExpirationTest );
CPPUNIT_TEST( testExpired );
@@ -42,72 +40,12 @@
CPPUNIT_TEST_SUITE_END();
public:
-
+
ExpirationTest(){}
virtual ~ExpirationTest(){}
-
+
virtual void testExpired();
virtual void testNotExpired();
-
- public:
-
- class Producer : public decaf::lang::Runnable {
- private:
-
- cms::Connection* connection;
- cms::Session* session;
- cms::Topic* destination;
- cms::MessageProducer* producer;
- int numMessages;
- long long timeToLive;
- bool disableTimeStamps;
- std::string topic;
-
- public:
-
- Producer( std::string topic, int numMessages, long long timeToLive );
-
- virtual ~Producer();
-
- virtual bool getDisableTimeStamps() const;
-
- virtual void setDisableTimeStamps( bool value );
-
- virtual void run();
-
- private:
-
- void cleanup();
- };
-
- class Consumer : public cms::MessageListener, public decaf::lang::Runnable {
-
- private:
-
- cms::Connection* connection;
- cms::Session* session;
- cms::Topic* destination;
- cms::MessageConsumer* consumer;
- long waitMillis;
- int numReceived;
- std::string topic;
-
- public:
-
- Consumer( std::string topic, long waitMillis );
-
- virtual ~Consumer();
-
- virtual int getNumReceived() const;
-
- virtual void run();
-
- virtual void onMessage( const cms::Message* message );
-
- private:
-
- void cleanup();
- };
};