You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/11/26 16:53:31 UTC

svn commit: r720906 - in /activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire: OpenwireTransactionTest.cpp OpenwireTransactionTest.h

Author: tabish
Date: Wed Nov 26 07:53:30 2008
New Revision: 720906

URL: http://svn.apache.org/viewvc?rev=720906&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-209

Start of rewrite for Openwire Transaction Tests.

Modified:
    activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp
    activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h

Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp?rev=720906&r1=720905&r2=720906&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp Wed Nov 26 07:53:30 2008
@@ -53,102 +53,129 @@
 #include <cms/TextMessage.h>
 #include <cms/MapMessage.h>
 
-using namespace activemq::connector::stomp;
-using namespace activemq::transport;
-using namespace activemq::util;
+#include <memory>
+
 using namespace std;
 using namespace cms;
 using namespace activemq;
+using namespace activemq::connector;
+using namespace activemq::transport;
 using namespace activemq::core;
 using namespace activemq::util;
-using namespace activemq::connector;
 using namespace activemq::exceptions;
 using namespace decaf::net;
-using namespace activemq::transport;
+using namespace decaf::util;
 using namespace decaf::util::concurrent;
 using namespace decaf::lang;
-using namespace decaf::util;
 
 using namespace integration;
+using namespace integration::connector;
 using namespace integration::connector::openwire;
 
+////////////////////////////////////////////////////////////////////////////////
 OpenwireTransactionTest::OpenwireTransactionTest() {
 }
 
-OpenwireTransactionTest::~OpenwireTransactionTest()
-{}
+////////////////////////////////////////////////////////////////////////////////
+OpenwireTransactionTest::~OpenwireTransactionTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireTransactionTest::setUp() {
+    testSupport = new TestSupport;
+    testSupport->initialize( IntegrationCommon::getInstance().getOpenwireURL(),
+                             cms::Session::SESSION_TRANSACTED);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireTransactionTest::tearDown() {
+    delete testSupport;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireTransactionTest::testSendReceiveTransactedBatches() {
 
-void OpenwireTransactionTest::test()
-{
     try
     {
-        if( IntegrationCommon::debug ) {
-            cout << "Starting activemqcms transactional test (sending "
-                 << IntegrationCommon::defaultMsgCount
-                 << " messages per type and sleeping "
-                 << IntegrationCommon::defaultDelay
-                << " milli-seconds) ...\n"
-                << endl;
-        }
-
-        // Create CMS Object for Comms
+        // Create CMS Object for Comms, this one is owned by the TestSupport
+        // class.
         cms::Session* session = testSupport->getSession();
-        cms::Topic* topic = session->createTopic("mytopic");
-        cms::MessageConsumer* consumer =
-            session->createConsumer( topic );
-        consumer->setMessageListener( testSupport );
-        cms::MessageProducer* producer =
-            session->createProducer( topic );
+
+        auto_ptr<cms::Topic> topic( session->createTopic("MYTOPIC") );
+        auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( topic.get() ) );
+        auto_ptr<cms::MessageProducer> producer( session->createProducer( topic.get() ) );
+
         producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
 
-        // Send some text messages
-        testSupport->produceTextMessages(
-            *producer, IntegrationCommon::defaultMsgCount );
+        auto_ptr<TextMessage> message( session->createTextMessage( "Batch Message" ) );
 
-        session->commit();
+        for( int j = 0; j < batchCount; j++ ) {
 
-        // Send some messages.
-        testSupport->produceTextMessages(
-            *producer, IntegrationCommon::defaultMsgCount );
+            for( int i = 0; i < batchSize; i++ ) {
+                producer->send( message.get() );
+            }
 
-        session->commit();
+            session->commit();
 
-        // Wait till we get all the messages
-        testSupport->waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+            for( int i = 0; i < batchSize; i++ ) {
+                message.reset( dynamic_cast<TextMessage*>( consumer->receive( 1000 * 5 ) ) );
 
-        unsigned int numReceived = testSupport->getNumReceived();
-        if( IntegrationCommon::debug ) {
-            printf("received: %d\n", numReceived );
+                CPPUNIT_ASSERT_MESSAGE(
+                    "Failed to receive all messages in batch", message.get() != NULL );
+                CPPUNIT_ASSERT( string("Batch Message") == message->getText() );
+            }
+
+            session->commit();
         }
-        CPPUNIT_ASSERT(
-            numReceived == IntegrationCommon::defaultMsgCount * 2 );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireTransactionTest::testSendRollback() {
+
+    try
+    {
+        // Create CMS Object for Comms, this one is owned by the TestSupport
+        // class.
+        cms::Session* session = testSupport->getSession();
 
-        testSupport->setNumReceived( 0 );
+        auto_ptr<cms::Topic> topic( session->createTopic("MYTOPIC") );
+        auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( topic.get() ) );
+        auto_ptr<cms::MessageProducer> producer( session->createProducer( topic.get() ) );
 
-        // Send some text messages
-        testSupport->produceTextMessages(
-            *producer, IntegrationCommon::defaultMsgCount );
+        producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+
+        auto_ptr<TextMessage> outbound1( session->createTextMessage( "First Message" ) );
+        auto_ptr<TextMessage> outbound2( session->createTextMessage( "Second Message" ) );
 
+        // sends a message
+        producer->send( outbound1.get() );
+        session->commit();
+
+        // sends a message that gets rollbacked
+        producer->send( session->createTextMessage( "I'm going to get rolled back." ) );
         session->rollback();
 
-        // Wait till we get all the messages
-        testSupport->waitForMessages( IntegrationCommon::defaultMsgCount );
+        // sends a message
+        producer->send( outbound2.get() );
+        session->commit();
+
+        // receives the first message
+        auto_ptr<TextMessage> inbound1(
+            dynamic_cast<TextMessage*>( consumer->receive( 1500 ) ) );
+
+        // receives the second message
+        auto_ptr<TextMessage> inbound2(
+            dynamic_cast<TextMessage*>( consumer->receive( 4000 ) ) );
 
-        numReceived = testSupport->getNumReceived();
-        if( IntegrationCommon::debug ) {
-            printf("received: %d\n", numReceived );
-        }
-        CPPUNIT_ASSERT(
-            numReceived == IntegrationCommon::defaultMsgCount );
+        // validates that the rollbacked was not consumed
+        session->commit();
 
-        if( IntegrationCommon::debug ) {
-            printf("Shutting Down\n" );
-        }
-        delete producer;
-        delete consumer;
-        delete topic;
+        CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+        CPPUNIT_ASSERT( outbound2->getText() == inbound2->getText() );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
-

Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h?rev=720906&r1=720905&r2=720906&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h Wed Nov 26 07:53:30 2008
@@ -30,25 +30,27 @@
     class OpenwireTransactionTest : public CppUnit::TestFixture
     {
         CPPUNIT_TEST_SUITE( OpenwireTransactionTest );
-        CPPUNIT_TEST( test );
+        CPPUNIT_TEST( testSendReceiveTransactedBatches );
+        CPPUNIT_TEST( testSendRollback );
         CPPUNIT_TEST_SUITE_END();
 
     private:
 
         TestSupport* testSupport;
 
+        static const int batchCount = 10;
+        static const int batchSize = 20;
+
     public:
 
         OpenwireTransactionTest();
         virtual ~OpenwireTransactionTest();
 
-        virtual void setUp() {
-            testSupport = new TestSupport;
-            testSupport->initialize(IntegrationCommon::getInstance().getOpenwireURL(), cms::Session::SESSION_TRANSACTED);
-        };
-        virtual void tearDown() { delete testSupport; };
+        virtual void setUp();
+        virtual void tearDown();
 
-        virtual void test();
+        virtual void testSendReceiveTransactedBatches();
+        virtual void testSendRollback();
 
     };