You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/04/08 01:59:08 UTC
svn commit: r763046 [2/2] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/
test/ test/activemq/core/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp Tue Apr 7 23:59:07 2009
@@ -41,31 +41,95 @@
using namespace decaf::lang;
////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+
+ class MyCMSMessageListener : public cms::MessageListener
+ {
+ public:
+
+ std::vector< Pointer<cms::Message> > messages;
+ decaf::util::concurrent::Mutex mutex;
+ bool ack;
+
+ public:
+
+ MyCMSMessageListener( bool ack = false ){
+ this->ack = ack;
+ }
+
+ virtual ~MyCMSMessageListener(){
+ clear();
+ }
+
+ virtual void setAck( bool ack ){
+ this->ack = ack;
+ }
+
+ virtual void clear() {
+ messages.clear();
+ }
+
+ virtual void onMessage( const cms::Message* message ) {
+
+ synchronized( &mutex ) {
+ if( ack ){
+ message->acknowledge();
+ }
+
+ messages.push_back( Pointer<cms::Message>( message->clone() ) );
+ mutex.notifyAll();
+ }
+ }
+
+ void asyncWaitForMessages( unsigned int count ) {
+
+ try {
+
+ synchronized( &mutex ) {
+ int stopAtZero = count + 5;
+
+ while( messages.size() < count ) {
+ mutex.wait( 750 );
+
+ if( --stopAtZero == 0 ) {
+ break;
+ }
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+ }
+ };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionTest::testAutoAcking() {
MyCMSMessageListener msgListener1;
MyCMSMessageListener msgListener2;
- CPPUNIT_ASSERT( connection != NULL );
+ CPPUNIT_ASSERT( connection.get() != NULL );
// Create an Auto Ack Session
- cms::Session* session = connection->createSession();
+ std::auto_ptr<cms::Session> session( connection->createSession() );
// Create a Topic
- cms::Topic* topic1 = session->createTopic( "TestTopic1" );
- cms::Topic* topic2 = session->createTopic( "TestTopic2" );
+ std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+ std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
- CPPUNIT_ASSERT( topic1 != NULL );
- CPPUNIT_ASSERT( topic2 != NULL );
+ CPPUNIT_ASSERT( topic1.get() != NULL );
+ CPPUNIT_ASSERT( topic2.get() != NULL );
// Create a consumer
- ActiveMQConsumer* consumer1 =
- dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1 ) );
- ActiveMQConsumer* consumer2 =
- dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2 ) );
+ std::auto_ptr<ActiveMQConsumer> consumer1(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+ std::auto_ptr<ActiveMQConsumer> consumer2(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
- CPPUNIT_ASSERT( consumer1 != NULL );
- CPPUNIT_ASSERT( consumer2 != NULL );
+ CPPUNIT_ASSERT( consumer1.get() != NULL );
+ CPPUNIT_ASSERT( consumer2.get() != NULL );
CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
@@ -80,80 +144,53 @@
injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
- synchronized( &msgListener1.mutex )
- {
- if( msgListener1.messages.size() == 0 )
- {
- msgListener1.mutex.wait( 3000 );
- }
- }
+ msgListener1.asyncWaitForMessages( 1 );
CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
- synchronized( &msgListener2.mutex )
- {
- if( msgListener2.messages.size() == 0 )
- {
- msgListener2.mutex.wait( 3000 );
- }
- }
+ msgListener2.asyncWaitForMessages( 1 );
CPPUNIT_ASSERT( msgListener2.messages.size() == 1 );
- cms::TextMessage* msg1 =
- dynamic_cast< cms::TextMessage* >(
- msgListener1.messages[0] );
- cms::TextMessage* msg2 =
- dynamic_cast< cms::TextMessage* >(
- msgListener2.messages[0] );
-
- CPPUNIT_ASSERT( msg1 != NULL );
- CPPUNIT_ASSERT( msg2 != NULL );
+ Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+ Pointer<cms::TextMessage> msg2 = msgListener2.messages[0].dynamicCast<cms::TextMessage>();
std::string text1 = msg1->getText();
std::string text2 = msg2->getText();
CPPUNIT_ASSERT( text1 == "This is a Test 1" );
CPPUNIT_ASSERT( text2 == "This is a Test 2" );
-
- delete topic1;
- delete topic2;
-
- delete consumer1;
- delete consumer2;
-
- delete session;
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionTest::testClientAck()
-{
+void ActiveMQSessionTest::testClientAck() {
+
MyCMSMessageListener msgListener1( true );
MyCMSMessageListener msgListener2( true );
- CPPUNIT_ASSERT( connection != NULL );
+ CPPUNIT_ASSERT( connection.get() != NULL );
- // Create an Auto Ack Session
- cms::Session* session = connection->createSession(
- cms::Session::CLIENT_ACKNOWLEDGE );
+ // Create an Client Ack Session
+ std::auto_ptr<cms::Session> session(
+ connection->createSession( cms::Session::CLIENT_ACKNOWLEDGE ) );
// Create a Topic
- cms::Topic* topic1 = session->createTopic( "TestTopic1");
- cms::Topic* topic2 = session->createTopic( "TestTopic2");
+ std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+ std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
- CPPUNIT_ASSERT( topic1 != NULL );
- CPPUNIT_ASSERT( topic2 != NULL );
+ CPPUNIT_ASSERT( topic1.get() != NULL );
+ CPPUNIT_ASSERT( topic2.get() != NULL );
// Create a consumer
- ActiveMQConsumer* consumer1 =
- dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1 ) );
- ActiveMQConsumer* consumer2 =
- dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2 ) );
+ std::auto_ptr<ActiveMQConsumer> consumer1(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+ std::auto_ptr<ActiveMQConsumer> consumer2(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
- CPPUNIT_ASSERT( consumer1 != NULL );
- CPPUNIT_ASSERT( consumer2 != NULL );
+ CPPUNIT_ASSERT( consumer1.get() != NULL );
+ CPPUNIT_ASSERT( consumer2.get() != NULL );
CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
@@ -168,13 +205,7 @@
injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
- synchronized( &msgListener1.mutex )
- {
- if( msgListener1.messages.size() == 0 )
- {
- msgListener1.mutex.wait( 3000 );
- }
- }
+ msgListener1.asyncWaitForMessages( 1 );
CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
@@ -182,70 +213,99 @@
injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
- synchronized( &msgListener2.mutex )
- {
- if( msgListener2.messages.size() == 0 )
- {
- msgListener2.mutex.wait( 3000 );
- }
- }
+ msgListener2.asyncWaitForMessages( 1 );
CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener2.messages.size() );
msgListener2.messages[0]->acknowledge();
- cms::TextMessage* msg1 =
- dynamic_cast< cms::TextMessage* >(
- msgListener1.messages[0] );
- cms::TextMessage* msg2 =
- dynamic_cast< cms::TextMessage* >(
- msgListener2.messages[0] );
-
- CPPUNIT_ASSERT( msg1 != NULL );
- CPPUNIT_ASSERT( msg2 != NULL );
+ Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+ Pointer<cms::TextMessage> msg2 = msgListener2.messages[0].dynamicCast<cms::TextMessage>();
std::string text1 = msg1->getText();
std::string text2 = msg2->getText();
CPPUNIT_ASSERT( text1 == "This is a Test 1" );
CPPUNIT_ASSERT( text2 == "This is a Test 2" );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::testTransactionCommitOneConsumer() {
+
+ static const int MSG_COUNT = 50;
+
+ MyCMSMessageListener msgListener1;
+
+ CPPUNIT_ASSERT( connection.get() != NULL );
- delete topic1;
- delete topic2;
+ // Create an Transacted Session
+ std::auto_ptr<cms::Session> session(
+ connection->createSession( cms::Session::SESSION_TRANSACTED ) );
- delete consumer1;
- delete consumer2;
+ // Create a Topic
+ std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+
+ CPPUNIT_ASSERT( topic1.get() != NULL );
+
+ // Create a consumer
+ std::auto_ptr<ActiveMQConsumer> consumer1(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+
+ CPPUNIT_ASSERT( consumer1.get() != NULL );
+
+ CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+
+ CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
+ CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
+
+ consumer1->setMessageListener( &msgListener1 );
+
+ for( int i = 0; i < MSG_COUNT; ++i ) {
+ injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
+ }
+
+ msgListener1.asyncWaitForMessages( MSG_COUNT );
- delete session;
+ CPPUNIT_ASSERT_EQUAL( MSG_COUNT, (int)msgListener1.messages.size() );
+
+ session->commit();
+
+ Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+
+ std::string text1 = msg1->getText();
+
+ CPPUNIT_ASSERT( text1 == "This is a Test 1" );
+
+ msgListener1.clear();
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionTest::testTransactional()
-{
+void ActiveMQSessionTest::testTransactionCommitTwoConsumer() {
+
MyCMSMessageListener msgListener1;
MyCMSMessageListener msgListener2;
- CPPUNIT_ASSERT( connection != NULL );
+ CPPUNIT_ASSERT( connection.get() != NULL );
// Create an Auto Ack Session
- cms::Session* session = connection->createSession(
- cms::Session::SESSION_TRANSACTED );
+ std::auto_ptr<cms::Session> session(
+ connection->createSession( cms::Session::SESSION_TRANSACTED ) );
// Create a Topic
- cms::Topic* topic1 = session->createTopic( "TestTopic1");
- cms::Topic* topic2 = session->createTopic( "TestTopic2");
+ std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+ std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
- CPPUNIT_ASSERT( topic1 != NULL );
- CPPUNIT_ASSERT( topic2 != NULL );
+ CPPUNIT_ASSERT( topic1.get() != NULL );
+ CPPUNIT_ASSERT( topic2.get() != NULL );
// Create a consumer
- ActiveMQConsumer* consumer1 =
- dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1 ) );
- ActiveMQConsumer* consumer2 =
- dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2 ) );
+ std::auto_ptr<ActiveMQConsumer> consumer1(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+ std::auto_ptr<ActiveMQConsumer> consumer2(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
- CPPUNIT_ASSERT( consumer1 != NULL );
- CPPUNIT_ASSERT( consumer2 != NULL );
+ CPPUNIT_ASSERT( consumer1.get() != NULL );
+ CPPUNIT_ASSERT( consumer2.get() != NULL );
CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
@@ -260,41 +320,20 @@
injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
- synchronized( &msgListener1.mutex )
- {
- if( msgListener1.messages.size() == 0 )
- {
- msgListener1.mutex.wait( 3000 );
- }
- }
+ msgListener1.asyncWaitForMessages( 1 );
CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
- session->commit();
-
injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
- synchronized( &msgListener2.mutex )
- {
- if( msgListener2.messages.size() == 0 )
- {
- msgListener2.mutex.wait( 3000 );
- }
- }
+ msgListener2.asyncWaitForMessages( 1 );
CPPUNIT_ASSERT( msgListener2.messages.size() == 1 );
session->commit();
- cms::TextMessage* msg1 =
- dynamic_cast< cms::TextMessage* >(
- msgListener1.messages[0] );
- cms::TextMessage* msg2 =
- dynamic_cast< cms::TextMessage* >(
- msgListener2.messages[0] );
-
- CPPUNIT_ASSERT( msg1 != NULL );
- CPPUNIT_ASSERT( msg2 != NULL );
+ Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+ Pointer<cms::TextMessage> msg2 = msgListener2.messages[0].dynamicCast<cms::TextMessage>();
std::string text1 = msg1->getText();
std::string text2 = msg2->getText();
@@ -304,57 +343,117 @@
msgListener1.clear();
msgListener2.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::testTransactionRollbackOneConsumer() {
+
+ MyCMSMessageListener msgListener1;
+
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<cms::Session> session(
+ connection->createSession( cms::Session::SESSION_TRANSACTED ) );
+
+ // Create a Topic
+ std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+
+ CPPUNIT_ASSERT( topic1.get() != NULL );
+
+ // Create a consumer
+ std::auto_ptr<ActiveMQConsumer> consumer1(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+
+ CPPUNIT_ASSERT( consumer1.get() != NULL );
+
+ CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+
+ CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
+ CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
+
+ consumer1->setMessageListener( &msgListener1 );
const unsigned int msgCount = 50;
- for( unsigned int i = 0; i < msgCount; ++i )
- {
+ for( unsigned int i = 0; i < msgCount; ++i ) {
std::ostringstream stream;
-
stream << "This is test message #" << i << std::ends;
-
injectTextMessage( stream.str() , *topic1, consumer1->getConsumerId() );
}
- for( unsigned int i = 0; i < msgCount; ++i )
- {
- std::ostringstream stream;
-
- stream << "This is test message #" << i << std::ends;
+ msgListener1.asyncWaitForMessages( msgCount );
- injectTextMessage( stream.str() , *topic2, consumer2->getConsumerId() );
- }
+ CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
- synchronized( &msgListener1.mutex )
- {
- const unsigned int interval = msgCount + 10;
- unsigned int count = 0;
+ msgListener1.clear();
- while( msgListener1.messages.size() != msgCount &&
- count < interval )
- {
- msgListener1.mutex.wait( 3000 );
+ session->rollback();
- ++count;
- }
- }
+ msgListener1.asyncWaitForMessages( msgCount );
CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
- synchronized( &msgListener2.mutex )
- {
- const int interval = msgCount + 10;
- int count = 0;
+ session->commit();
+}
- while( msgListener2.messages.size() != msgCount &&
- count < interval )
- {
- msgListener2.mutex.wait( 3000 );
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::testTransactionRollbackTwoConsumer() {
- ++count;
- }
+ MyCMSMessageListener msgListener1;
+ MyCMSMessageListener msgListener2;
+
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<cms::Session> session(
+ connection->createSession( cms::Session::SESSION_TRANSACTED ) );
+
+ // Create a Topic
+ std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+ std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
+
+ CPPUNIT_ASSERT( topic1.get() != NULL );
+ CPPUNIT_ASSERT( topic2.get() != NULL );
+
+ // Create a consumer
+ std::auto_ptr<ActiveMQConsumer> consumer1(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+ std::auto_ptr<ActiveMQConsumer> consumer2(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
+
+ CPPUNIT_ASSERT( consumer1.get() != NULL );
+ CPPUNIT_ASSERT( consumer2.get() != NULL );
+
+ CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+ CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
+
+ CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
+ CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
+ CPPUNIT_ASSERT( consumer2->receiveNoWait() == NULL );
+ CPPUNIT_ASSERT( consumer2->receive( 5 ) == NULL );
+
+ consumer1->setMessageListener( &msgListener1 );
+ consumer2->setMessageListener( &msgListener2 );
+
+ const unsigned int msgCount = 50;
+
+ for( unsigned int i = 0; i < msgCount; ++i ) {
+ std::ostringstream stream;
+ stream << "This is test message #" << i << std::ends;
+ injectTextMessage( stream.str() , *topic1, consumer1->getConsumerId() );
}
+ for( unsigned int i = 0; i < msgCount; ++i ) {
+ std::ostringstream stream;
+ stream << "This is test message #" << i << std::ends;
+ injectTextMessage( stream.str() , *topic2, consumer2->getConsumerId() );
+ }
+
+ msgListener1.asyncWaitForMessages( msgCount );
+
+ CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
+
+ msgListener2.asyncWaitForMessages( msgCount );
+
CPPUNIT_ASSERT( msgListener2.messages.size() == msgCount );
msgListener1.clear();
@@ -362,73 +461,93 @@
session->rollback();
- synchronized( &msgListener1.mutex )
- {
- const int interval = msgCount + 10;
- int count = 0;
+ msgListener1.asyncWaitForMessages( msgCount );
- while( msgListener1.messages.size() != msgCount &&
- count < interval )
- {
- msgListener1.mutex.wait( 3000 );
+ CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
- ++count;
- }
- }
+ msgListener2.asyncWaitForMessages( msgCount );
- CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
+ CPPUNIT_ASSERT( msgListener2.messages.size() == msgCount );
- synchronized( &msgListener2.mutex )
- {
- const int interval = msgCount + 10;
- int count = 0;
+ session->commit();
+}
- while( msgListener2.messages.size() != msgCount &&
- count < interval )
- {
- msgListener2.mutex.wait( 3000 );
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::testTransactionCloseWithoutCommit() {
- ++count;
- }
+ static const int MSG_COUNT = 50;
+
+ MyCMSMessageListener msgListener1;
+
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ // Create an Transacted Session
+ std::auto_ptr<cms::Session> session(
+ connection->createSession( cms::Session::SESSION_TRANSACTED ) );
+
+ // Create a Topic
+ std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+
+ CPPUNIT_ASSERT( topic1.get() != NULL );
+
+ // Create a consumer
+ std::auto_ptr<ActiveMQConsumer> consumer1(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+
+ CPPUNIT_ASSERT( consumer1.get() != NULL );
+
+ CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+
+ CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
+ CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
+
+ consumer1->setMessageListener( &msgListener1 );
+
+ for( int i = 0; i < MSG_COUNT; ++i ) {
+ injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
}
- CPPUNIT_ASSERT( msgListener2.messages.size() == msgCount );
+ msgListener1.asyncWaitForMessages( MSG_COUNT );
- delete topic1;
- delete topic2;
+ CPPUNIT_ASSERT_EQUAL( MSG_COUNT, (int)msgListener1.messages.size() );
- delete consumer1;
- delete consumer2;
+ // This is what we are testing, since there was no commit, the session
+ // will rollback the transaction when this are closed.
+ // session->commit();
- delete session;
+ // TODO - We should be able to close the consumer but we don't have a way
+ // to keep the consumer alive if deleted after calling close so the session
+ // segfaults on attempting to rollback the transaction and visiting all its
+ // registered Synchronizations.
+ session->close();
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionTest::testExpiration()
-{
+void ActiveMQSessionTest::testExpiration() {
+
MyCMSMessageListener msgListener1;
MyCMSMessageListener msgListener2;
- CPPUNIT_ASSERT( connection != NULL );
+ CPPUNIT_ASSERT( connection.get() != NULL );
// Create an Auto Ack Session
- cms::Session* session = connection->createSession();
+ std::auto_ptr<cms::Session> session( connection->createSession() );
// Create a Topic
- cms::Topic* topic1 = session->createTopic( "TestTopic1");
- cms::Topic* topic2 = session->createTopic( "TestTopic2");
+ std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+ std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
- CPPUNIT_ASSERT( topic1 != NULL );
- CPPUNIT_ASSERT( topic2 != NULL );
+ CPPUNIT_ASSERT( topic1.get() != NULL );
+ CPPUNIT_ASSERT( topic2.get() != NULL );
// Create a consumer
- ActiveMQConsumer* consumer1 =
- dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1 ) );
- ActiveMQConsumer* consumer2 =
- dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2 ) );
+ std::auto_ptr<ActiveMQConsumer> consumer1(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+ std::auto_ptr<ActiveMQConsumer> consumer2(
+ dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
- CPPUNIT_ASSERT( consumer1 != NULL );
- CPPUNIT_ASSERT( consumer2 != NULL );
+ CPPUNIT_ASSERT( consumer1.get() != NULL );
+ CPPUNIT_ASSERT( consumer2.get() != NULL );
consumer1->setMessageListener( &msgListener1 );
consumer2->setMessageListener( &msgListener2 );
@@ -439,13 +558,7 @@
decaf::util::Date::getCurrentTimeMilliseconds(),
50 );
- synchronized( &msgListener1.mutex )
- {
- if( msgListener1.messages.size() == 0 )
- {
- msgListener1.mutex.wait( 3000 );
- }
- }
+ msgListener1.asyncWaitForMessages( 1 );
CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
@@ -455,33 +568,15 @@
decaf::util::Date::getCurrentTimeMilliseconds() - 100,
1 );
- synchronized( &msgListener2.mutex )
- {
- if( msgListener2.messages.size() == 0 )
- {
- msgListener2.mutex.wait( 100 );
- }
- }
+ msgListener2.asyncWaitForMessages( 1 );
CPPUNIT_ASSERT( msgListener2.messages.size() == 0 );
- cms::TextMessage* msg1 =
- dynamic_cast< cms::TextMessage* >(
- msgListener1.messages[0] );
-
- CPPUNIT_ASSERT( msg1 != NULL );
+ Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
std::string text1 = msg1->getText();
CPPUNIT_ASSERT( text1 == "This is a Test 1" );
-
- delete topic1;
- delete topic2;
-
- delete consumer1;
- delete consumer2;
-
- delete session;
}
////////////////////////////////////////////////////////////////////////////////
@@ -491,11 +586,11 @@
{
ActiveMQConnectionFactory factory("mock://127.0.0.1:12345?wireFormat=openwire");
- connection = dynamic_cast< ActiveMQConnection*>(
- factory.createConnection() );
+ connection.reset( dynamic_cast< ActiveMQConnection*>( factory.createConnection() ) );
- // Get the Transport and make sure we got a dummy Transport
- dTransport = transport::mock::MockTransport::getInstance();
+ // Get a pointer to the Mock Transport for Message injection.
+ dTransport = dynamic_cast<transport::mock::MockTransport*>(
+ connection->getTransport().narrow( typeid( transport::mock::MockTransport ) ) );
CPPUNIT_ASSERT( dTransport != NULL );
connection->setExceptionListener( &exListener );
@@ -511,7 +606,7 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionTest::tearDown() {
- delete connection;
+ connection.reset( NULL );
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h Tue Apr 7 23:59:07 2009
@@ -30,6 +30,7 @@
#include <activemq/transport/mock/MockTransport.h>
#include <activemq/util/Config.h>
#include <activemq/commands/ConsumerId.h>
+#include <memory>
namespace activemq{
namespace core{
@@ -39,7 +40,11 @@
CPPUNIT_TEST_SUITE( ActiveMQSessionTest );
CPPUNIT_TEST( testAutoAcking );
CPPUNIT_TEST( testClientAck );
- //CPPUNIT_TEST( testTransactional );
+ CPPUNIT_TEST( testTransactionCommitOneConsumer );
+ CPPUNIT_TEST( testTransactionCommitTwoConsumer );
+ CPPUNIT_TEST( testTransactionRollbackOneConsumer );
+ CPPUNIT_TEST( testTransactionRollbackTwoConsumer );
+ CPPUNIT_TEST( testTransactionCloseWithoutCommit );
CPPUNIT_TEST( testExpiration );
CPPUNIT_TEST_SUITE_END();
@@ -60,56 +65,7 @@
}
};
- class MyCMSMessageListener : public cms::MessageListener
- {
- public:
-
- std::vector<cms::Message*> messages;
- decaf::util::concurrent::Mutex mutex;
- bool ack;
-
- public:
-
- MyCMSMessageListener( bool ack = false ){
- this->ack = ack;
- }
-
- virtual ~MyCMSMessageListener(){
- clear();
- }
-
- virtual void setAck( bool ack ){
- this->ack = ack;
- }
-
- virtual void clear() {
- std::vector<cms::Message*>::iterator itr =
- messages.begin();
-
- for( ; itr != messages.end(); ++itr )
- {
- delete *itr;
- }
-
- messages.clear();
- }
-
- virtual void onMessage( const cms::Message* message )
- {
- synchronized( &mutex )
- {
- if( ack ){
- message->acknowledge();
- }
-
- messages.push_back( message->clone() );
-
- mutex.notifyAll();
- }
- }
- };
-
- ActiveMQConnection* connection;
+ std::auto_ptr<ActiveMQConnection> connection;
transport::mock::MockTransport* dTransport;
MyExceptionListener exListener;
@@ -130,7 +86,11 @@
void testAutoAcking();
void testClientAck();
- void testTransactional();
+ void testTransactionCommitOneConsumer();
+ void testTransactionCommitTwoConsumer();
+ void testTransactionRollbackOneConsumer();
+ void testTransactionRollbackTwoConsumer();
+ void testTransactionCloseWithoutCommit();
void testExpiration();
};
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp?rev=763046&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp Tue Apr 7 23:59:07 2009
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageDispatchChannelTest.h"
+
+#include <activemq/core/MessageDispatchChannel.h>
+#include <activemq/commands/MessageDispatch.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/lang/System.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace decaf;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testCtor() {
+
+ MessageDispatchChannel channel;
+ CPPUNIT_ASSERT( channel.isRunning() == false );
+ CPPUNIT_ASSERT( channel.isEmpty() == true );
+ CPPUNIT_ASSERT( channel.size() == 0 );
+ CPPUNIT_ASSERT( channel.isClosed() == false );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testStart() {
+
+ MessageDispatchChannel channel;
+ channel.start();
+ CPPUNIT_ASSERT( channel.isRunning() == true );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testStop() {
+
+ MessageDispatchChannel channel;
+ channel.start();
+ CPPUNIT_ASSERT( channel.isRunning() == true );
+ channel.stop();
+ CPPUNIT_ASSERT( channel.isRunning() == false );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testClose() {
+
+ MessageDispatchChannel channel;
+ channel.start();
+ CPPUNIT_ASSERT( channel.isRunning() == true );
+ CPPUNIT_ASSERT( channel.isClosed() == false );
+ channel.close();
+ CPPUNIT_ASSERT( channel.isRunning() == false );
+ CPPUNIT_ASSERT( channel.isClosed() == true );
+ channel.start();
+ CPPUNIT_ASSERT( channel.isRunning() == false );
+ CPPUNIT_ASSERT( channel.isClosed() == true );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testEnqueue() {
+
+ MessageDispatchChannel channel;
+ Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == true );
+ CPPUNIT_ASSERT( channel.size() == 0 );
+
+ channel.enqueue( dispatch1 );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 1 );
+
+ channel.enqueue( dispatch2 );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 2 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testEnqueueFront() {
+
+ MessageDispatchChannel channel;
+ Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+
+ channel.start();
+
+ CPPUNIT_ASSERT( channel.isEmpty() == true );
+ CPPUNIT_ASSERT( channel.size() == 0 );
+
+ channel.enqueueFirst( dispatch1 );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 1 );
+
+ channel.enqueueFirst( dispatch2 );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 2 );
+
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch2 );
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch1 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testPeek() {
+
+ MessageDispatchChannel channel;
+ Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == true );
+ CPPUNIT_ASSERT( channel.size() == 0 );
+
+ channel.enqueueFirst( dispatch1 );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 1 );
+
+ channel.enqueueFirst( dispatch2 );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 2 );
+
+ CPPUNIT_ASSERT( channel.peek() == NULL );
+
+ channel.start();
+
+ CPPUNIT_ASSERT( channel.peek() == dispatch2 );
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch2 );
+ CPPUNIT_ASSERT( channel.peek() == dispatch1 );
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch1 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testDequeueNoWait() {
+
+ MessageDispatchChannel channel;
+
+ Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch3( new MessageDispatch() );
+
+ CPPUNIT_ASSERT( channel.isRunning() == false );
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == NULL );
+
+ channel.enqueue( dispatch1 );
+ channel.enqueue( dispatch2 );
+ channel.enqueue( dispatch3 );
+
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == NULL );
+ channel.start();
+ CPPUNIT_ASSERT( channel.isRunning() == true );
+
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 3 );
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch1 );
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch2 );
+ CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch3 );
+
+ CPPUNIT_ASSERT( channel.size() == 0 );
+ CPPUNIT_ASSERT( channel.isEmpty() == true );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testDequeue() {
+
+ MessageDispatchChannel channel;
+
+ Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch3( new MessageDispatch() );
+
+ channel.start();
+ CPPUNIT_ASSERT( channel.isRunning() == true );
+
+ long long timeStarted = System::currentTimeMillis();
+
+ CPPUNIT_ASSERT( channel.dequeue( 1000 ) == NULL );
+
+ CPPUNIT_ASSERT( System::currentTimeMillis() - timeStarted >= 999 );
+
+ channel.enqueue( dispatch1 );
+ channel.enqueue( dispatch2 );
+ channel.enqueue( dispatch3 );
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 3 );
+ CPPUNIT_ASSERT( channel.dequeue( -1 ) == dispatch1 );
+ CPPUNIT_ASSERT( channel.dequeue( 0 ) == dispatch2 );
+ CPPUNIT_ASSERT( channel.dequeue( 1000 ) == dispatch3 );
+
+ CPPUNIT_ASSERT( channel.size() == 0 );
+ CPPUNIT_ASSERT( channel.isEmpty() == true );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testRemoveAll() {
+
+ MessageDispatchChannel channel;
+
+ Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+ Pointer<MessageDispatch> dispatch3( new MessageDispatch() );
+
+ channel.enqueue( dispatch1 );
+ channel.enqueue( dispatch2 );
+ channel.enqueue( dispatch3 );
+
+ channel.start();
+ CPPUNIT_ASSERT( channel.isRunning() == true );
+ CPPUNIT_ASSERT( channel.isEmpty() == false );
+ CPPUNIT_ASSERT( channel.size() == 3 );
+ CPPUNIT_ASSERT( channel.removeAll().size() == 3 );
+ CPPUNIT_ASSERT( channel.size() == 0 );
+ CPPUNIT_ASSERT( channel.isEmpty() == true );
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h?rev=763046&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h Tue Apr 7 23:59:07 2009
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNELTEST_H_
+#define _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNELTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace core {
+
+ class MessageDispatchChannelTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( MessageDispatchChannelTest );
+ CPPUNIT_TEST( testCtor );
+ CPPUNIT_TEST( testStart );
+ CPPUNIT_TEST( testStop );
+ CPPUNIT_TEST( testClose );
+ CPPUNIT_TEST( testEnqueue );
+ CPPUNIT_TEST( testEnqueueFront );
+ CPPUNIT_TEST( testPeek );
+ CPPUNIT_TEST( testDequeueNoWait );
+ CPPUNIT_TEST( testDequeue );
+ CPPUNIT_TEST( testRemoveAll );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ MessageDispatchChannelTest() {}
+ virtual ~MessageDispatchChannelTest() {}
+
+ void testCtor();
+ void testStart();
+ void testStop();
+ void testClose();
+ void testEnqueue();
+ void testEnqueueFront();
+ void testPeek();
+ void testDequeueNoWait();
+ void testDequeue();
+ void testRemoveAll();
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNELTEST_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Tue Apr 7 23:59:07 2009
@@ -75,6 +75,8 @@
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQConnectionTest );
#include <activemq/core/ActiveMQSessionTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQSessionTest );
+#include <activemq/core/MessageDispatchChannelTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::MessageDispatchChannelTest );
#include <activemq/state/ConnectionStateTrackerTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::state::ConnectionStateTrackerTest );