You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/04/08 01:59:08 UTC

svn commit: r763046 [2/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ test/ test/activemq/core/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp Tue Apr  7 23:59:07 2009
@@ -41,31 +41,95 @@
 using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+
+    class MyCMSMessageListener : public cms::MessageListener
+    {
+    public:
+
+        std::vector< Pointer<cms::Message> > messages;
+        decaf::util::concurrent::Mutex mutex;
+        bool ack;
+
+    public:
+
+        MyCMSMessageListener( bool ack = false ){
+            this->ack = ack;
+        }
+
+        virtual ~MyCMSMessageListener(){
+            clear();
+        }
+
+        virtual void setAck( bool ack ){
+            this->ack = ack;
+        }
+
+        virtual void clear() {
+            messages.clear();
+        }
+
+        virtual void onMessage( const cms::Message* message ) {
+
+            synchronized( &mutex ) {
+                if( ack ){
+                    message->acknowledge();
+                }
+
+                messages.push_back( Pointer<cms::Message>( message->clone() ) );
+                mutex.notifyAll();
+            }
+        }
+
+        void asyncWaitForMessages( unsigned int count ) {
+
+            try {
+
+                synchronized( &mutex ) {
+                    int stopAtZero = count + 5;
+
+                    while( messages.size() < count ) {
+                        mutex.wait( 750 );
+
+                        if( --stopAtZero == 0 ) {
+                            break;
+                        }
+                    }
+                }
+            }
+            AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+            AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+        }
+    };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionTest::testAutoAcking() {
 
     MyCMSMessageListener msgListener1;
     MyCMSMessageListener msgListener2;
 
-    CPPUNIT_ASSERT( connection != NULL );
+    CPPUNIT_ASSERT( connection.get() != NULL );
 
     // Create an Auto Ack Session
-    cms::Session* session = connection->createSession();
+    std::auto_ptr<cms::Session> session( connection->createSession() );
 
     // Create a Topic
-    cms::Topic* topic1 = session->createTopic( "TestTopic1" );
-    cms::Topic* topic2 = session->createTopic( "TestTopic2" );
+    std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+    std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
 
-    CPPUNIT_ASSERT( topic1 != NULL );
-    CPPUNIT_ASSERT( topic2 != NULL );
+    CPPUNIT_ASSERT( topic1.get() != NULL );
+    CPPUNIT_ASSERT( topic2.get() != NULL );
 
     // Create a consumer
-    ActiveMQConsumer* consumer1 =
-        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1 ) );
-    ActiveMQConsumer* consumer2 =
-        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2 ) );
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+    std::auto_ptr<ActiveMQConsumer> consumer2(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
 
-    CPPUNIT_ASSERT( consumer1 != NULL );
-    CPPUNIT_ASSERT( consumer2 != NULL );
+    CPPUNIT_ASSERT( consumer1.get() != NULL );
+    CPPUNIT_ASSERT( consumer2.get() != NULL );
 
     CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
     CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
@@ -80,80 +144,53 @@
 
     injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
 
-    synchronized( &msgListener1.mutex )
-    {
-        if( msgListener1.messages.size() == 0 )
-        {
-            msgListener1.mutex.wait( 3000 );
-        }
-    }
+    msgListener1.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
 
     injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
 
-    synchronized( &msgListener2.mutex )
-    {
-        if( msgListener2.messages.size() == 0 )
-        {
-            msgListener2.mutex.wait( 3000 );
-        }
-    }
+    msgListener2.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT( msgListener2.messages.size() == 1 );
 
-    cms::TextMessage* msg1 =
-        dynamic_cast< cms::TextMessage* >(
-            msgListener1.messages[0] );
-    cms::TextMessage* msg2 =
-        dynamic_cast< cms::TextMessage* >(
-            msgListener2.messages[0] );
-
-    CPPUNIT_ASSERT( msg1 != NULL );
-    CPPUNIT_ASSERT( msg2 != NULL );
+    Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+    Pointer<cms::TextMessage> msg2 = msgListener2.messages[0].dynamicCast<cms::TextMessage>();
 
     std::string text1 = msg1->getText();
     std::string text2 = msg2->getText();
 
     CPPUNIT_ASSERT( text1 == "This is a Test 1" );
     CPPUNIT_ASSERT( text2 == "This is a Test 2" );
-
-    delete topic1;
-    delete topic2;
-
-    delete consumer1;
-    delete consumer2;
-
-    delete session;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionTest::testClientAck()
-{
+void ActiveMQSessionTest::testClientAck() {
+
     MyCMSMessageListener msgListener1( true );
     MyCMSMessageListener msgListener2( true );
 
-    CPPUNIT_ASSERT( connection != NULL );
+    CPPUNIT_ASSERT( connection.get() != NULL );
 
-    // Create an Auto Ack Session
-    cms::Session* session = connection->createSession(
-        cms::Session::CLIENT_ACKNOWLEDGE );
+    // Create an Client Ack Session
+    std::auto_ptr<cms::Session> session(
+        connection->createSession( cms::Session::CLIENT_ACKNOWLEDGE ) );
 
     // Create a Topic
-    cms::Topic* topic1 = session->createTopic( "TestTopic1");
-    cms::Topic* topic2 = session->createTopic( "TestTopic2");
+    std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+    std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
 
-    CPPUNIT_ASSERT( topic1 != NULL );
-    CPPUNIT_ASSERT( topic2 != NULL );
+    CPPUNIT_ASSERT( topic1.get() != NULL );
+    CPPUNIT_ASSERT( topic2.get() != NULL );
 
     // Create a consumer
-    ActiveMQConsumer* consumer1 =
-        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1 ) );
-    ActiveMQConsumer* consumer2 =
-        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2 ) );
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+    std::auto_ptr<ActiveMQConsumer> consumer2(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
 
-    CPPUNIT_ASSERT( consumer1 != NULL );
-    CPPUNIT_ASSERT( consumer2 != NULL );
+    CPPUNIT_ASSERT( consumer1.get() != NULL );
+    CPPUNIT_ASSERT( consumer2.get() != NULL );
 
     CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
     CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
@@ -168,13 +205,7 @@
 
     injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
 
-    synchronized( &msgListener1.mutex )
-    {
-        if( msgListener1.messages.size() == 0 )
-        {
-            msgListener1.mutex.wait( 3000 );
-        }
-    }
+    msgListener1.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
 
@@ -182,70 +213,99 @@
 
     injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
 
-    synchronized( &msgListener2.mutex )
-    {
-        if( msgListener2.messages.size() == 0 )
-        {
-            msgListener2.mutex.wait( 3000 );
-        }
-    }
+    msgListener2.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener2.messages.size() );
 
     msgListener2.messages[0]->acknowledge();
 
-    cms::TextMessage* msg1 =
-        dynamic_cast< cms::TextMessage* >(
-            msgListener1.messages[0] );
-    cms::TextMessage* msg2 =
-        dynamic_cast< cms::TextMessage* >(
-            msgListener2.messages[0] );
-
-    CPPUNIT_ASSERT( msg1 != NULL );
-    CPPUNIT_ASSERT( msg2 != NULL );
+    Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+    Pointer<cms::TextMessage> msg2 = msgListener2.messages[0].dynamicCast<cms::TextMessage>();
 
     std::string text1 = msg1->getText();
     std::string text2 = msg2->getText();
 
     CPPUNIT_ASSERT( text1 == "This is a Test 1" );
     CPPUNIT_ASSERT( text2 == "This is a Test 2" );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::testTransactionCommitOneConsumer() {
+
+    static const int MSG_COUNT = 50;
+
+    MyCMSMessageListener msgListener1;
+
+    CPPUNIT_ASSERT( connection.get() != NULL );
 
-    delete topic1;
-    delete topic2;
+    // Create an Transacted Session
+    std::auto_ptr<cms::Session> session(
+        connection->createSession( cms::Session::SESSION_TRANSACTED ) );
 
-    delete consumer1;
-    delete consumer2;
+    // Create a Topic
+    std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+
+    CPPUNIT_ASSERT( topic1.get() != NULL );
+
+    // Create a consumer
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+
+    CPPUNIT_ASSERT( consumer1.get() != NULL );
+
+    CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+
+    CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
+    CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
+
+    consumer1->setMessageListener( &msgListener1 );
+
+    for( int i = 0; i < MSG_COUNT; ++i ) {
+        injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
+    }
+
+    msgListener1.asyncWaitForMessages( MSG_COUNT );
 
-    delete session;
+    CPPUNIT_ASSERT_EQUAL( MSG_COUNT, (int)msgListener1.messages.size() );
+
+    session->commit();
+
+    Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+
+    std::string text1 = msg1->getText();
+
+    CPPUNIT_ASSERT( text1 == "This is a Test 1" );
+
+    msgListener1.clear();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionTest::testTransactional()
-{
+void ActiveMQSessionTest::testTransactionCommitTwoConsumer() {
+
     MyCMSMessageListener msgListener1;
     MyCMSMessageListener msgListener2;
 
-    CPPUNIT_ASSERT( connection != NULL );
+    CPPUNIT_ASSERT( connection.get() != NULL );
 
     // Create an Auto Ack Session
-    cms::Session* session = connection->createSession(
-        cms::Session::SESSION_TRANSACTED );
+    std::auto_ptr<cms::Session> session(
+        connection->createSession( cms::Session::SESSION_TRANSACTED ) );
 
     // Create a Topic
-    cms::Topic* topic1 = session->createTopic( "TestTopic1");
-    cms::Topic* topic2 = session->createTopic( "TestTopic2");
+    std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+    std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
 
-    CPPUNIT_ASSERT( topic1 != NULL );
-    CPPUNIT_ASSERT( topic2 != NULL );
+    CPPUNIT_ASSERT( topic1.get() != NULL );
+    CPPUNIT_ASSERT( topic2.get() != NULL );
 
     // Create a consumer
-    ActiveMQConsumer* consumer1 =
-        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1 ) );
-    ActiveMQConsumer* consumer2 =
-        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2 ) );
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+    std::auto_ptr<ActiveMQConsumer> consumer2(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
 
-    CPPUNIT_ASSERT( consumer1 != NULL );
-    CPPUNIT_ASSERT( consumer2 != NULL );
+    CPPUNIT_ASSERT( consumer1.get() != NULL );
+    CPPUNIT_ASSERT( consumer2.get() != NULL );
 
     CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
     CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
@@ -260,41 +320,20 @@
 
     injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
 
-    synchronized( &msgListener1.mutex )
-    {
-        if( msgListener1.messages.size() == 0 )
-        {
-            msgListener1.mutex.wait( 3000 );
-        }
-    }
+    msgListener1.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
 
-    session->commit();
-
     injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
 
-    synchronized( &msgListener2.mutex )
-    {
-        if( msgListener2.messages.size() == 0 )
-        {
-            msgListener2.mutex.wait( 3000 );
-        }
-    }
+    msgListener2.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT( msgListener2.messages.size() == 1 );
 
     session->commit();
 
-    cms::TextMessage* msg1 =
-        dynamic_cast< cms::TextMessage* >(
-            msgListener1.messages[0] );
-    cms::TextMessage* msg2 =
-        dynamic_cast< cms::TextMessage* >(
-            msgListener2.messages[0] );
-
-    CPPUNIT_ASSERT( msg1 != NULL );
-    CPPUNIT_ASSERT( msg2 != NULL );
+    Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+    Pointer<cms::TextMessage> msg2 = msgListener2.messages[0].dynamicCast<cms::TextMessage>();
 
     std::string text1 = msg1->getText();
     std::string text2 = msg2->getText();
@@ -304,57 +343,117 @@
 
     msgListener1.clear();
     msgListener2.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::testTransactionRollbackOneConsumer() {
+
+    MyCMSMessageListener msgListener1;
+
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<cms::Session> session(
+        connection->createSession( cms::Session::SESSION_TRANSACTED ) );
+
+    // Create a Topic
+    std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+
+    CPPUNIT_ASSERT( topic1.get() != NULL );
+
+    // Create a consumer
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+
+    CPPUNIT_ASSERT( consumer1.get() != NULL );
+
+    CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+
+    CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
+    CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
+
+    consumer1->setMessageListener( &msgListener1 );
 
     const unsigned int msgCount = 50;
 
-    for( unsigned int i = 0; i < msgCount; ++i )
-    {
+    for( unsigned int i = 0; i < msgCount; ++i ) {
         std::ostringstream stream;
-
         stream << "This is test message #" << i << std::ends;
-
         injectTextMessage( stream.str() , *topic1, consumer1->getConsumerId() );
     }
 
-    for( unsigned int i = 0; i < msgCount; ++i )
-    {
-        std::ostringstream stream;
-
-        stream << "This is test message #" << i << std::ends;
+    msgListener1.asyncWaitForMessages( msgCount );
 
-        injectTextMessage( stream.str() , *topic2, consumer2->getConsumerId() );
-    }
+    CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
 
-    synchronized( &msgListener1.mutex )
-    {
-        const unsigned int interval = msgCount + 10;
-        unsigned int count = 0;
+    msgListener1.clear();
 
-        while( msgListener1.messages.size() != msgCount &&
-               count < interval )
-        {
-            msgListener1.mutex.wait( 3000 );
+    session->rollback();
 
-            ++count;
-        }
-    }
+    msgListener1.asyncWaitForMessages( msgCount );
 
     CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
 
-    synchronized( &msgListener2.mutex )
-    {
-        const int interval = msgCount + 10;
-        int count = 0;
+    session->commit();
+}
 
-        while( msgListener2.messages.size() != msgCount &&
-               count < interval )
-        {
-            msgListener2.mutex.wait( 3000 );
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::testTransactionRollbackTwoConsumer() {
 
-            ++count;
-        }
+    MyCMSMessageListener msgListener1;
+    MyCMSMessageListener msgListener2;
+
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<cms::Session> session(
+        connection->createSession( cms::Session::SESSION_TRANSACTED ) );
+
+    // Create a Topic
+    std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+    std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
+
+    CPPUNIT_ASSERT( topic1.get() != NULL );
+    CPPUNIT_ASSERT( topic2.get() != NULL );
+
+    // Create a consumer
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+    std::auto_ptr<ActiveMQConsumer> consumer2(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
+
+    CPPUNIT_ASSERT( consumer1.get() != NULL );
+    CPPUNIT_ASSERT( consumer2.get() != NULL );
+
+    CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+    CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
+
+    CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
+    CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
+    CPPUNIT_ASSERT( consumer2->receiveNoWait() == NULL );
+    CPPUNIT_ASSERT( consumer2->receive( 5 ) == NULL );
+
+    consumer1->setMessageListener( &msgListener1 );
+    consumer2->setMessageListener( &msgListener2 );
+
+    const unsigned int msgCount = 50;
+
+    for( unsigned int i = 0; i < msgCount; ++i ) {
+        std::ostringstream stream;
+        stream << "This is test message #" << i << std::ends;
+        injectTextMessage( stream.str() , *topic1, consumer1->getConsumerId() );
     }
 
+    for( unsigned int i = 0; i < msgCount; ++i ) {
+        std::ostringstream stream;
+        stream << "This is test message #" << i << std::ends;
+        injectTextMessage( stream.str() , *topic2, consumer2->getConsumerId() );
+    }
+
+    msgListener1.asyncWaitForMessages( msgCount );
+
+    CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
+
+    msgListener2.asyncWaitForMessages( msgCount );
+
     CPPUNIT_ASSERT( msgListener2.messages.size() == msgCount );
 
     msgListener1.clear();
@@ -362,73 +461,93 @@
 
     session->rollback();
 
-    synchronized( &msgListener1.mutex )
-    {
-        const int interval = msgCount + 10;
-        int count = 0;
+    msgListener1.asyncWaitForMessages( msgCount );
 
-        while( msgListener1.messages.size() != msgCount &&
-               count < interval )
-        {
-            msgListener1.mutex.wait( 3000 );
+    CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
 
-            ++count;
-        }
-    }
+    msgListener2.asyncWaitForMessages( msgCount );
 
-    CPPUNIT_ASSERT( msgListener1.messages.size() == msgCount );
+    CPPUNIT_ASSERT( msgListener2.messages.size() == msgCount );
 
-    synchronized( &msgListener2.mutex )
-    {
-        const int interval = msgCount + 10;
-        int count = 0;
+    session->commit();
+}
 
-        while( msgListener2.messages.size() != msgCount &&
-               count < interval )
-        {
-            msgListener2.mutex.wait( 3000 );
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::testTransactionCloseWithoutCommit() {
 
-            ++count;
-        }
+    static const int MSG_COUNT = 50;
+
+    MyCMSMessageListener msgListener1;
+
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    // Create an Transacted Session
+    std::auto_ptr<cms::Session> session(
+        connection->createSession( cms::Session::SESSION_TRANSACTED ) );
+
+    // Create a Topic
+    std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+
+    CPPUNIT_ASSERT( topic1.get() != NULL );
+
+    // Create a consumer
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+
+    CPPUNIT_ASSERT( consumer1.get() != NULL );
+
+    CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+
+    CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
+    CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
+
+    consumer1->setMessageListener( &msgListener1 );
+
+    for( int i = 0; i < MSG_COUNT; ++i ) {
+        injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
     }
 
-    CPPUNIT_ASSERT( msgListener2.messages.size() == msgCount );
+    msgListener1.asyncWaitForMessages( MSG_COUNT );
 
-    delete topic1;
-    delete topic2;
+    CPPUNIT_ASSERT_EQUAL( MSG_COUNT, (int)msgListener1.messages.size() );
 
-    delete consumer1;
-    delete consumer2;
+    // This is what we are testing, since there was no commit, the session
+    // will rollback the transaction when this are closed.
+    // session->commit();
 
-    delete session;
+    // TODO - We should be able to close the consumer but we don't have a way
+    // to keep the consumer alive if deleted after calling close so the session
+    // segfaults on attempting to rollback the transaction and visiting all its
+    // registered Synchronizations.
+    session->close();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionTest::testExpiration()
-{
+void ActiveMQSessionTest::testExpiration() {
+
     MyCMSMessageListener msgListener1;
     MyCMSMessageListener msgListener2;
 
-    CPPUNIT_ASSERT( connection != NULL );
+    CPPUNIT_ASSERT( connection.get() != NULL );
 
     // Create an Auto Ack Session
-    cms::Session* session = connection->createSession();
+    std::auto_ptr<cms::Session> session( connection->createSession() );
 
     // Create a Topic
-    cms::Topic* topic1 = session->createTopic( "TestTopic1");
-    cms::Topic* topic2 = session->createTopic( "TestTopic2");
+    std::auto_ptr<cms::Topic> topic1( session->createTopic( "TestTopic1" ) );
+    std::auto_ptr<cms::Topic> topic2( session->createTopic( "TestTopic2" ) );
 
-    CPPUNIT_ASSERT( topic1 != NULL );
-    CPPUNIT_ASSERT( topic2 != NULL );
+    CPPUNIT_ASSERT( topic1.get() != NULL );
+    CPPUNIT_ASSERT( topic2.get() != NULL );
 
     // Create a consumer
-    ActiveMQConsumer* consumer1 =
-        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1 ) );
-    ActiveMQConsumer* consumer2 =
-        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2 ) );
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+    std::auto_ptr<ActiveMQConsumer> consumer2(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic2.get() ) ) );
 
-    CPPUNIT_ASSERT( consumer1 != NULL );
-    CPPUNIT_ASSERT( consumer2 != NULL );
+    CPPUNIT_ASSERT( consumer1.get() != NULL );
+    CPPUNIT_ASSERT( consumer2.get() != NULL );
 
     consumer1->setMessageListener( &msgListener1 );
     consumer2->setMessageListener( &msgListener2 );
@@ -439,13 +558,7 @@
                        decaf::util::Date::getCurrentTimeMilliseconds(),
                        50 );
 
-    synchronized( &msgListener1.mutex )
-    {
-        if( msgListener1.messages.size() == 0 )
-        {
-            msgListener1.mutex.wait( 3000 );
-        }
-    }
+    msgListener1.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
 
@@ -455,33 +568,15 @@
                        decaf::util::Date::getCurrentTimeMilliseconds() - 100,
                        1 );
 
-    synchronized( &msgListener2.mutex )
-    {
-        if( msgListener2.messages.size() == 0 )
-        {
-            msgListener2.mutex.wait( 100 );
-        }
-    }
+    msgListener2.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT( msgListener2.messages.size() == 0 );
 
-    cms::TextMessage* msg1 =
-        dynamic_cast< cms::TextMessage* >(
-            msgListener1.messages[0] );
-
-    CPPUNIT_ASSERT( msg1 != NULL );
+    Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
 
     std::string text1 = msg1->getText();
 
     CPPUNIT_ASSERT( text1 == "This is a Test 1" );
-
-    delete topic1;
-    delete topic2;
-
-    delete consumer1;
-    delete consumer2;
-
-    delete session;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -491,11 +586,11 @@
     {
         ActiveMQConnectionFactory factory("mock://127.0.0.1:12345?wireFormat=openwire");
 
-        connection = dynamic_cast< ActiveMQConnection*>(
-            factory.createConnection() );
+        connection.reset( dynamic_cast< ActiveMQConnection*>( factory.createConnection() ) );
 
-        // Get the Transport and make sure we got a dummy Transport
-        dTransport = transport::mock::MockTransport::getInstance();
+        // Get a pointer to the Mock Transport for Message injection.
+        dTransport = dynamic_cast<transport::mock::MockTransport*>(
+            connection->getTransport().narrow( typeid( transport::mock::MockTransport ) ) );
         CPPUNIT_ASSERT( dTransport != NULL );
 
         connection->setExceptionListener( &exListener );
@@ -511,7 +606,7 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionTest::tearDown() {
-    delete connection;
+    connection.reset( NULL );
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h Tue Apr  7 23:59:07 2009
@@ -30,6 +30,7 @@
 #include <activemq/transport/mock/MockTransport.h>
 #include <activemq/util/Config.h>
 #include <activemq/commands/ConsumerId.h>
+#include <memory>
 
 namespace activemq{
 namespace core{
@@ -39,7 +40,11 @@
         CPPUNIT_TEST_SUITE( ActiveMQSessionTest );
         CPPUNIT_TEST( testAutoAcking );
         CPPUNIT_TEST( testClientAck );
-        //CPPUNIT_TEST( testTransactional );
+        CPPUNIT_TEST( testTransactionCommitOneConsumer );
+        CPPUNIT_TEST( testTransactionCommitTwoConsumer );
+        CPPUNIT_TEST( testTransactionRollbackOneConsumer );
+        CPPUNIT_TEST( testTransactionRollbackTwoConsumer );
+        CPPUNIT_TEST( testTransactionCloseWithoutCommit );
         CPPUNIT_TEST( testExpiration );
         CPPUNIT_TEST_SUITE_END();
 
@@ -60,56 +65,7 @@
             }
         };
 
-        class MyCMSMessageListener : public cms::MessageListener
-        {
-        public:
-
-            std::vector<cms::Message*> messages;
-            decaf::util::concurrent::Mutex mutex;
-            bool ack;
-
-        public:
-
-            MyCMSMessageListener( bool ack = false ){
-                this->ack = ack;
-            }
-
-            virtual ~MyCMSMessageListener(){
-                clear();
-            }
-
-            virtual void setAck( bool ack ){
-                this->ack = ack;
-            }
-
-            virtual void clear() {
-                std::vector<cms::Message*>::iterator itr =
-                    messages.begin();
-
-                for( ; itr != messages.end(); ++itr )
-                {
-                    delete *itr;
-                }
-
-                messages.clear();
-            }
-
-            virtual void onMessage( const cms::Message* message )
-            {
-                synchronized( &mutex )
-                {
-                    if( ack ){
-                        message->acknowledge();
-                    }
-
-                    messages.push_back( message->clone() );
-
-                    mutex.notifyAll();
-                }
-            }
-        };
-
-        ActiveMQConnection* connection;
+        std::auto_ptr<ActiveMQConnection> connection;
         transport::mock::MockTransport* dTransport;
         MyExceptionListener exListener;
 
@@ -130,7 +86,11 @@
 
         void testAutoAcking();
         void testClientAck();
-        void testTransactional();
+        void testTransactionCommitOneConsumer();
+        void testTransactionCommitTwoConsumer();
+        void testTransactionRollbackOneConsumer();
+        void testTransactionRollbackTwoConsumer();
+        void testTransactionCloseWithoutCommit();
         void testExpiration();
 
     };

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp?rev=763046&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp Tue Apr  7 23:59:07 2009
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageDispatchChannelTest.h"
+
+#include <activemq/core/MessageDispatchChannel.h>
+#include <activemq/commands/MessageDispatch.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/lang/System.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace decaf;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testCtor() {
+
+    MessageDispatchChannel channel;
+    CPPUNIT_ASSERT( channel.isRunning() == false );
+    CPPUNIT_ASSERT( channel.isEmpty() == true );
+    CPPUNIT_ASSERT( channel.size() == 0 );
+    CPPUNIT_ASSERT( channel.isClosed() == false );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testStart() {
+
+    MessageDispatchChannel channel;
+    channel.start();
+    CPPUNIT_ASSERT( channel.isRunning() == true );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testStop() {
+
+    MessageDispatchChannel channel;
+    channel.start();
+    CPPUNIT_ASSERT( channel.isRunning() == true );
+    channel.stop();
+    CPPUNIT_ASSERT( channel.isRunning() == false );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testClose() {
+
+    MessageDispatchChannel channel;
+    channel.start();
+    CPPUNIT_ASSERT( channel.isRunning() == true );
+    CPPUNIT_ASSERT( channel.isClosed() == false );
+    channel.close();
+    CPPUNIT_ASSERT( channel.isRunning() == false );
+    CPPUNIT_ASSERT( channel.isClosed() == true );
+    channel.start();
+    CPPUNIT_ASSERT( channel.isRunning() == false );
+    CPPUNIT_ASSERT( channel.isClosed() == true );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testEnqueue() {
+
+    MessageDispatchChannel channel;
+    Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == true );
+    CPPUNIT_ASSERT( channel.size() == 0 );
+
+    channel.enqueue( dispatch1 );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 1 );
+
+    channel.enqueue( dispatch2 );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 2 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testEnqueueFront() {
+
+    MessageDispatchChannel channel;
+    Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+
+    channel.start();
+
+    CPPUNIT_ASSERT( channel.isEmpty() == true );
+    CPPUNIT_ASSERT( channel.size() == 0 );
+
+    channel.enqueueFirst( dispatch1 );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 1 );
+
+    channel.enqueueFirst( dispatch2 );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 2 );
+
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch2 );
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch1 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testPeek() {
+
+    MessageDispatchChannel channel;
+    Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == true );
+    CPPUNIT_ASSERT( channel.size() == 0 );
+
+    channel.enqueueFirst( dispatch1 );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 1 );
+
+    channel.enqueueFirst( dispatch2 );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 2 );
+
+    CPPUNIT_ASSERT( channel.peek() == NULL );
+
+    channel.start();
+
+    CPPUNIT_ASSERT( channel.peek() == dispatch2 );
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch2 );
+    CPPUNIT_ASSERT( channel.peek() == dispatch1 );
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch1 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testDequeueNoWait() {
+
+    MessageDispatchChannel channel;
+
+    Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch3( new MessageDispatch() );
+
+    CPPUNIT_ASSERT( channel.isRunning() == false );
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == NULL );
+
+    channel.enqueue( dispatch1 );
+    channel.enqueue( dispatch2 );
+    channel.enqueue( dispatch3 );
+
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == NULL );
+    channel.start();
+    CPPUNIT_ASSERT( channel.isRunning() == true );
+
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 3 );
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch1 );
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch2 );
+    CPPUNIT_ASSERT( channel.dequeueNoWait() == dispatch3 );
+
+    CPPUNIT_ASSERT( channel.size() == 0 );
+    CPPUNIT_ASSERT( channel.isEmpty() == true );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testDequeue() {
+
+    MessageDispatchChannel channel;
+
+    Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch3( new MessageDispatch() );
+
+    channel.start();
+    CPPUNIT_ASSERT( channel.isRunning() == true );
+
+    long long timeStarted = System::currentTimeMillis();
+
+    CPPUNIT_ASSERT( channel.dequeue( 1000 ) == NULL );
+
+    CPPUNIT_ASSERT( System::currentTimeMillis() - timeStarted >= 999 );
+
+    channel.enqueue( dispatch1 );
+    channel.enqueue( dispatch2 );
+    channel.enqueue( dispatch3 );
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 3 );
+    CPPUNIT_ASSERT( channel.dequeue( -1 ) == dispatch1 );
+    CPPUNIT_ASSERT( channel.dequeue( 0 ) == dispatch2 );
+    CPPUNIT_ASSERT( channel.dequeue( 1000 ) == dispatch3 );
+
+    CPPUNIT_ASSERT( channel.size() == 0 );
+    CPPUNIT_ASSERT( channel.isEmpty() == true );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannelTest::testRemoveAll() {
+
+    MessageDispatchChannel channel;
+
+    Pointer<MessageDispatch> dispatch1( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch2( new MessageDispatch() );
+    Pointer<MessageDispatch> dispatch3( new MessageDispatch() );
+
+    channel.enqueue( dispatch1 );
+    channel.enqueue( dispatch2 );
+    channel.enqueue( dispatch3 );
+
+    channel.start();
+    CPPUNIT_ASSERT( channel.isRunning() == true );
+    CPPUNIT_ASSERT( channel.isEmpty() == false );
+    CPPUNIT_ASSERT( channel.size() == 3 );
+    CPPUNIT_ASSERT( channel.removeAll().size() == 3 );
+    CPPUNIT_ASSERT( channel.size() == 0 );
+    CPPUNIT_ASSERT( channel.isEmpty() == true );
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h?rev=763046&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h Tue Apr  7 23:59:07 2009
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNELTEST_H_
+#define _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNELTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace core {
+
+    class MessageDispatchChannelTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( MessageDispatchChannelTest );
+        CPPUNIT_TEST( testCtor );
+        CPPUNIT_TEST( testStart );
+        CPPUNIT_TEST( testStop );
+        CPPUNIT_TEST( testClose );
+        CPPUNIT_TEST( testEnqueue );
+        CPPUNIT_TEST( testEnqueueFront );
+        CPPUNIT_TEST( testPeek );
+        CPPUNIT_TEST( testDequeueNoWait );
+        CPPUNIT_TEST( testDequeue );
+        CPPUNIT_TEST( testRemoveAll );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        MessageDispatchChannelTest() {}
+        virtual ~MessageDispatchChannelTest() {}
+
+        void testCtor();
+        void testStart();
+        void testStop();
+        void testClose();
+        void testEnqueue();
+        void testEnqueueFront();
+        void testPeek();
+        void testDequeueNoWait();
+        void testDequeue();
+        void testRemoveAll();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNELTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Tue Apr  7 23:59:07 2009
@@ -75,6 +75,8 @@
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQConnectionTest );
 #include <activemq/core/ActiveMQSessionTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQSessionTest );
+#include <activemq/core/MessageDispatchChannelTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::MessageDispatchChannelTest );
 
 #include <activemq/state/ConnectionStateTrackerTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::state::ConnectionStateTrackerTest );