You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/11/26 16:53:31 UTC
svn commit: r720906 - in
/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire:
OpenwireTransactionTest.cpp OpenwireTransactionTest.h
Author: tabish
Date: Wed Nov 26 07:53:30 2008
New Revision: 720906
URL: http://svn.apache.org/viewvc?rev=720906&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-209
Start of rewrite for Openwire Transaction Tests.
Modified:
activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp
activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h
Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp?rev=720906&r1=720905&r2=720906&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp Wed Nov 26 07:53:30 2008
@@ -53,102 +53,129 @@
#include <cms/TextMessage.h>
#include <cms/MapMessage.h>
-using namespace activemq::connector::stomp;
-using namespace activemq::transport;
-using namespace activemq::util;
+#include <memory>
+
using namespace std;
using namespace cms;
using namespace activemq;
+using namespace activemq::connector;
+using namespace activemq::transport;
using namespace activemq::core;
using namespace activemq::util;
-using namespace activemq::connector;
using namespace activemq::exceptions;
using namespace decaf::net;
-using namespace activemq::transport;
+using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::lang;
-using namespace decaf::util;
using namespace integration;
+using namespace integration::connector;
using namespace integration::connector::openwire;
+////////////////////////////////////////////////////////////////////////////////
OpenwireTransactionTest::OpenwireTransactionTest() {
}
-OpenwireTransactionTest::~OpenwireTransactionTest()
-{}
+////////////////////////////////////////////////////////////////////////////////
+OpenwireTransactionTest::~OpenwireTransactionTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireTransactionTest::setUp() {
+ testSupport = new TestSupport;
+ testSupport->initialize( IntegrationCommon::getInstance().getOpenwireURL(),
+ cms::Session::SESSION_TRANSACTED);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireTransactionTest::tearDown() {
+ delete testSupport;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireTransactionTest::testSendReceiveTransactedBatches() {
-void OpenwireTransactionTest::test()
-{
try
{
- if( IntegrationCommon::debug ) {
- cout << "Starting activemqcms transactional test (sending "
- << IntegrationCommon::defaultMsgCount
- << " messages per type and sleeping "
- << IntegrationCommon::defaultDelay
- << " milli-seconds) ...\n"
- << endl;
- }
-
- // Create CMS Object for Comms
+ // Create CMS Object for Comms, this one is owned by the TestSupport
+ // class.
cms::Session* session = testSupport->getSession();
- cms::Topic* topic = session->createTopic("mytopic");
- cms::MessageConsumer* consumer =
- session->createConsumer( topic );
- consumer->setMessageListener( testSupport );
- cms::MessageProducer* producer =
- session->createProducer( topic );
+
+ auto_ptr<cms::Topic> topic( session->createTopic("MYTOPIC") );
+ auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( topic.get() ) );
+ auto_ptr<cms::MessageProducer> producer( session->createProducer( topic.get() ) );
+
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
- // Send some text messages
- testSupport->produceTextMessages(
- *producer, IntegrationCommon::defaultMsgCount );
+ auto_ptr<TextMessage> message( session->createTextMessage( "Batch Message" ) );
- session->commit();
+ for( int j = 0; j < batchCount; j++ ) {
- // Send some messages.
- testSupport->produceTextMessages(
- *producer, IntegrationCommon::defaultMsgCount );
+ for( int i = 0; i < batchSize; i++ ) {
+ producer->send( message.get() );
+ }
- session->commit();
+ session->commit();
- // Wait till we get all the messages
- testSupport->waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+ for( int i = 0; i < batchSize; i++ ) {
+ message.reset( dynamic_cast<TextMessage*>( consumer->receive( 1000 * 5 ) ) );
- unsigned int numReceived = testSupport->getNumReceived();
- if( IntegrationCommon::debug ) {
- printf("received: %d\n", numReceived );
+ CPPUNIT_ASSERT_MESSAGE(
+ "Failed to receive all messages in batch", message.get() != NULL );
+ CPPUNIT_ASSERT( string("Batch Message") == message->getText() );
+ }
+
+ session->commit();
}
- CPPUNIT_ASSERT(
- numReceived == IntegrationCommon::defaultMsgCount * 2 );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireTransactionTest::testSendRollback() {
+
+ try
+ {
+ // Create CMS Object for Comms, this one is owned by the TestSupport
+ // class.
+ cms::Session* session = testSupport->getSession();
- testSupport->setNumReceived( 0 );
+ auto_ptr<cms::Topic> topic( session->createTopic("MYTOPIC") );
+ auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( topic.get() ) );
+ auto_ptr<cms::MessageProducer> producer( session->createProducer( topic.get() ) );
- // Send some text messages
- testSupport->produceTextMessages(
- *producer, IntegrationCommon::defaultMsgCount );
+ producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+
+ auto_ptr<TextMessage> outbound1( session->createTextMessage( "First Message" ) );
+ auto_ptr<TextMessage> outbound2( session->createTextMessage( "Second Message" ) );
+ // sends a message
+ producer->send( outbound1.get() );
+ session->commit();
+
+ // sends a message that gets rollbacked
+ producer->send( session->createTextMessage( "I'm going to get rolled back." ) );
session->rollback();
- // Wait till we get all the messages
- testSupport->waitForMessages( IntegrationCommon::defaultMsgCount );
+ // sends a message
+ producer->send( outbound2.get() );
+ session->commit();
+
+ // receives the first message
+ auto_ptr<TextMessage> inbound1(
+ dynamic_cast<TextMessage*>( consumer->receive( 1500 ) ) );
+
+ // receives the second message
+ auto_ptr<TextMessage> inbound2(
+ dynamic_cast<TextMessage*>( consumer->receive( 4000 ) ) );
- numReceived = testSupport->getNumReceived();
- if( IntegrationCommon::debug ) {
- printf("received: %d\n", numReceived );
- }
- CPPUNIT_ASSERT(
- numReceived == IntegrationCommon::defaultMsgCount );
+ // validates that the rollbacked was not consumed
+ session->commit();
- if( IntegrationCommon::debug ) {
- printf("Shutting Down\n" );
- }
- delete producer;
- delete consumer;
- delete topic;
+ CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+ CPPUNIT_ASSERT( outbound2->getText() == inbound2->getText() );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
-
Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h?rev=720906&r1=720905&r2=720906&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h Wed Nov 26 07:53:30 2008
@@ -30,25 +30,27 @@
class OpenwireTransactionTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE( OpenwireTransactionTest );
- CPPUNIT_TEST( test );
+ CPPUNIT_TEST( testSendReceiveTransactedBatches );
+ CPPUNIT_TEST( testSendRollback );
CPPUNIT_TEST_SUITE_END();
private:
TestSupport* testSupport;
+ static const int batchCount = 10;
+ static const int batchSize = 20;
+
public:
OpenwireTransactionTest();
virtual ~OpenwireTransactionTest();
- virtual void setUp() {
- testSupport = new TestSupport;
- testSupport->initialize(IntegrationCommon::getInstance().getOpenwireURL(), cms::Session::SESSION_TRANSACTED);
- };
- virtual void tearDown() { delete testSupport; };
+ virtual void setUp();
+ virtual void tearDown();
- virtual void test();
+ virtual void testSendReceiveTransactedBatches();
+ virtual void testSendRollback();
};