You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/11/18 23:23:49 UTC

svn commit: r1036656 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ test-integration/activemq/test/openwire/

Author: tabish
Date: Thu Nov 18 22:23:49 2010
New Revision: 1036656

URL: http://svn.apache.org/viewvc?rev=1036656&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-329

More tests and an additonal fix.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=1036656&r1=1036655&r2=1036656&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Thu Nov 18 22:23:49 2010
@@ -303,11 +303,6 @@ void ActiveMQTransactionContext::afterRo
 
 ////////////////////////////////////////////////////////////////////////////////
 const Pointer<TransactionId>& ActiveMQTransactionContext::getTransactionId() const {
-    if( this->context->transactionId == NULL ) {
-        throw decaf::lang::exceptions::InvalidStateException(
-            __FILE__, __LINE__, "Transaction Not Started." );
-    }
-
     return this->context->transactionId;
 }
 
@@ -590,6 +585,7 @@ void ActiveMQTransactionContext::end( co
 
             setXid( NULL );
         }
+
     } else {
         throw XAException( XAException::XAER_INVAL );
     }
@@ -694,17 +690,6 @@ void ActiveMQTransactionContext::setXid(
             } catch( CMSException& e ) {
                 throw toXAException( e );
             }
-
-//            // Add our self to the list of contexts that are interested in
-//            // post commit/rollback events.
-//            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
-//            if (l == NULL) {
-//                l = new ArrayList<TransactionContext>(3);
-//                ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
-//                l.add(this);
-//            } else if (!l.contains(this)) {
-//                l.add(this);
-//            }
         }
 
         // remove the association currently in place.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp?rev=1036656&r1=1036655&r2=1036656&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp Thu Nov 18 22:23:49 2010
@@ -21,8 +21,10 @@
 #include <activemq/core/ActiveMQXAConnection.h>
 #include <activemq/core/ActiveMQXASession.h>
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/commands/XATransactionId.h>
 
 #include <decaf/lang/Thread.h>
+#include <decaf/util/UUID.h>
 
 #include <cms/ConnectionFactory.h>
 #include <cms/Connection.h>
@@ -41,12 +43,19 @@ using namespace cms;
 using namespace std;
 using namespace decaf;
 using namespace decaf::lang;
+using namespace decaf::util;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
 using namespace activemq::test;
 using namespace activemq::test::openwire;
 
 ////////////////////////////////////////////////////////////////////////////////
+const int OpenwireXATransactionsTest::batchCount = 10;
+const int OpenwireXATransactionsTest::batchSize = 20;
+
+////////////////////////////////////////////////////////////////////////////////
 OpenwireXATransactionsTest::OpenwireXATransactionsTest() {
 }
 
@@ -115,6 +124,26 @@ void OpenwireXATransactionsTest::testCre
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testGetXAResource() {
+
+    std::auto_ptr<XAConnectionFactory> factory(
+        XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+    CPPUNIT_ASSERT( factory.get() != NULL );
+
+    std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<XASession> session( connection->createXASession() );
+    CPPUNIT_ASSERT( session.get() != NULL );
+
+    XAResource* xaResource = session->getXAResource();
+    CPPUNIT_ASSERT( xaResource != NULL );
+    CPPUNIT_ASSERT( xaResource->isSameRM( xaResource ) );
+    CPPUNIT_ASSERT_NO_THROW( xaResource->setTransactionTimeout( 10000 ) );
+    CPPUNIT_ASSERT( xaResource->getTransactionTimeout() == 0 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void OpenwireXATransactionsTest::testSendReceiveOutsideTX() {
 
     std::auto_ptr<XAConnectionFactory> factory(
@@ -127,20 +156,450 @@ void OpenwireXATransactionsTest::testSen
     std::auto_ptr<XASession> session( connection->createXASession() );
     CPPUNIT_ASSERT( session.get() != NULL );
 
+    ActiveMQXASession* amqXASession = dynamic_cast<ActiveMQXASession*>( session.get() );
+    CPPUNIT_ASSERT( amqXASession != NULL );
+
     std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
     std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
     std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
 
+    CPPUNIT_ASSERT( amqXASession->isAutoAcknowledge() == true );
+    CPPUNIT_ASSERT( amqXASession->isTransacted() == false );
+
     connection->start();
 
     for( int i = 0; i < 50; ++i ) {
-        std::auto_ptr<Message> message( session->createTextMessage( "TEST" ) );
+        std::auto_ptr<cms::Message> message( session->createTextMessage( "TEST" ) );
         producer->send( message.get() );
     }
 
     for( int i = 0; i < 50; ++i ) {
-        std::auto_ptr<Message> message( consumer->receive( 3000 ) );
+        std::auto_ptr<cms::Message> message( consumer->receive( 3000 ) );
         CPPUNIT_ASSERT( message.get() != NULL );
         CPPUNIT_ASSERT( dynamic_cast<TextMessage*>( message.get() ) != NULL );
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Xid* OpenwireXATransactionsTest::createXid() const {
+
+    std::string branchQualStr = UUID::randomUUID().toString();
+    std::string globalTxIdStr = this->txIdGen.generateId();
+
+    std::vector<unsigned char> branchQual( branchQualStr.begin(), branchQualStr.end() );
+    std::vector<unsigned char> globalTxId( globalTxIdStr.begin(), globalTxIdStr.end() );
+
+    if( (int)branchQual.size() > Xid::MAXBQUALSIZE ) {
+        branchQual.resize( Xid::MAXBQUALSIZE );
+    }
+
+    if( (int)globalTxId.size() > Xid::MAXGTRIDSIZE ) {
+        globalTxId.resize( Xid::MAXGTRIDSIZE );
+    }
+
+    XATransactionId* id = new XATransactionId();
+
+    id->setFormatId( 0 );
+    id->setBranchQualifier( branchQual );
+    id->setGlobalTransactionId( globalTxId );
+
+    return id;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testSendReceiveTransactedBatches() {
+
+    std::auto_ptr<XAConnectionFactory> factory(
+        XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+    CPPUNIT_ASSERT( factory.get() != NULL );
+
+    std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<XASession> session( connection->createXASession() );
+    CPPUNIT_ASSERT( session.get() != NULL );
+
+    std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+    std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+
+    XAResource* xaResource = session->getXAResource();
+    CPPUNIT_ASSERT( xaResource != NULL );
+
+    connection->start();
+
+    for( int j = 0; j < batchCount; j++ ) {
+
+        std::auto_ptr<cms::Xid> txIdSend( this->createXid() );
+        xaResource->start( txIdSend.get(), 0 );
+
+        std::auto_ptr<TextMessage> message( session->createTextMessage( "Batch Message" ) );
+
+        for( int i = 0; i < batchSize; i++ ) {
+            CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+                "Send should not throw an exception here.",
+                producer->send( message.get() ) );
+        }
+
+        CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+             "Should not have thrown an Exception for xaResource->end",
+             xaResource->end( txIdSend.get(), XAResource::TMSUCCESS ) );
+        CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+             "Should not have thrown an Exception for xaResource->prepare",
+             xaResource->prepare( txIdSend.get() ) );
+        CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+             "Should not have thrown an Exception for xaResource->commit",
+             xaResource->commit( txIdSend.get(), false ) );
+
+        std::auto_ptr<cms::Xid> txIdRecv( this->createXid() );
+        xaResource->start( txIdRecv.get(), 0 );
+
+        for( int i = 0; i < batchSize; i++ ) {
+
+            CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+                "Receive Shouldn't throw a Message here:",
+                message.reset( dynamic_cast<TextMessage*>( consumer->receive( 1000 * 5 ) ) ) );
+
+            CPPUNIT_ASSERT_MESSAGE(
+                "Failed to receive all messages in batch", message.get() != NULL );
+            CPPUNIT_ASSERT( string("Batch Message") == message->getText() );
+         }
+
+        CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+             "Should not have thrown an Exception for xaResource->end",
+             xaResource->end( txIdRecv.get(), XAResource::TMSUCCESS ) );
+        CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+             "Should not have thrown an Exception for xaResource->prepare",
+             xaResource->prepare( txIdRecv.get() ) );
+        CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+             "Should not have thrown an Exception for xaResource->commit",
+             xaResource->commit( txIdRecv.get(), false ) );
+    }
+
+    std::auto_ptr<cms::Message> message;
+
+    CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+        "Receive Shouldn't throw a Message here:",
+        message.reset( consumer->receive( 2000 ) ) );
+
+    CPPUNIT_ASSERT_MESSAGE(
+        "Unexpected Message Received after XA Batches all processed", message.get() == NULL );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testSendRollback() {
+
+    std::auto_ptr<XAConnectionFactory> factory(
+        XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+    CPPUNIT_ASSERT( factory.get() != NULL );
+
+    std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<XASession> session( connection->createXASession() );
+    CPPUNIT_ASSERT( session.get() != NULL );
+
+    std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+    std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+
+    XAResource* xaResource = session->getXAResource();
+    CPPUNIT_ASSERT( xaResource != NULL );
+
+    connection->start();
+
+    auto_ptr<TextMessage> outbound1( session->createTextMessage( "First Message" ) );
+    auto_ptr<TextMessage> outbound2( session->createTextMessage( "Second Message" ) );
+
+    // start a new XA Transaction
+    std::auto_ptr<cms::Xid> ixId( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // sends a message
+    producer->send( outbound1.get() );
+
+    // commit the sent message
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->prepare( ixId.get() );
+    xaResource->commit( ixId.get(), false );
+
+    // start a new XA Transaction
+    ixId.reset( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // sends a message that gets rollbacked
+    auto_ptr<cms::Message> rollback(
+        session->createTextMessage( "I'm going to get rolled back." ) );
+    producer->send( rollback.get() );
+
+    // Roll back the sent message
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->rollback( ixId.get() );
+
+    // start a new XA Transaction
+    ixId.reset( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // sends a message
+    producer->send( outbound2.get() );
+
+    // commit the sent message
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->prepare( ixId.get() );
+    xaResource->commit( ixId.get(), false );
+
+    // receives the first message
+    auto_ptr<TextMessage> inbound1(
+        dynamic_cast<TextMessage*>( consumer->receive( 1500 ) ) );
+
+    // receives the second message
+    auto_ptr<TextMessage> inbound2(
+        dynamic_cast<TextMessage*>( consumer->receive( 4000 ) ) );
+
+    CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+    CPPUNIT_ASSERT( outbound2->getText() == inbound2->getText() );
+
+    // Checks to make sure there's no other messages on the Destination.
+    CPPUNIT_ASSERT( consumer->receive( 3000 ) == NULL );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testSendRollbackCommitRollback() {
+
+    std::auto_ptr<XAConnectionFactory> factory(
+        XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+    CPPUNIT_ASSERT( factory.get() != NULL );
+
+    std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<XASession> session( connection->createXASession() );
+    CPPUNIT_ASSERT( session.get() != NULL );
+
+    std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+    XAResource* xaResource = session->getXAResource();
+    CPPUNIT_ASSERT( xaResource != NULL );
+
+    connection->start();
+
+    auto_ptr<TextMessage> outbound1( session->createTextMessage( "First Message" ) );
+    auto_ptr<TextMessage> outbound2( session->createTextMessage( "Second Message" ) );
+
+    // start a new XA Transaction
+    std::auto_ptr<cms::Xid> ixId( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // sends them and then rolls back.
+    producer->send( outbound1.get() );
+    producer->send( outbound2.get() );
+
+    // Roll back the sent message
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->rollback( ixId.get() );
+
+    // start a new XA Transaction
+    ixId.reset( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // Send one and commit.
+    producer->send( outbound1.get() );
+
+    // commit the sent message
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->prepare( ixId.get() );
+    xaResource->commit( ixId.get(), false );
+
+    // start a new XA Transaction
+    ixId.reset( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+
+    // receives the first message
+    auto_ptr<TextMessage> inbound1(
+        dynamic_cast<TextMessage*>( consumer->receive( 1500 ) ) );
+
+    CPPUNIT_ASSERT( NULL == consumer->receive( 1500 ) );
+    CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+
+    // Roll back the sent message
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->rollback( ixId.get() );
+
+    consumer->close();
+
+    // start a new XA Transaction
+    ixId.reset( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    consumer.reset( session->createConsumer( destination.get() ) );
+
+    inbound1.reset(
+        dynamic_cast<TextMessage*>( consumer->receive( 1500 ) ) );
+
+    CPPUNIT_ASSERT( NULL == consumer->receive( 1500 ) );
+    CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+
+    // commit the received message
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->prepare( ixId.get() );
+    xaResource->commit( ixId.get(), false );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testWithTTLSet() {
+
+    std::auto_ptr<XAConnectionFactory> factory(
+        XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+    CPPUNIT_ASSERT( factory.get() != NULL );
+
+    std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<XASession> session( connection->createXASession() );
+    CPPUNIT_ASSERT( session.get() != NULL );
+
+    std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+    XAResource* xaResource = session->getXAResource();
+    CPPUNIT_ASSERT( xaResource != NULL );
+
+    connection->start();
+
+    auto_ptr<TextMessage> outbound1( session->createTextMessage( "First Message" ) );
+
+    const std::size_t NUM_MESSAGES = 50;
+
+    // start a new XA Transaction
+    std::auto_ptr<cms::Xid> ixId( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // sends a message
+    for( std::size_t i = 0; i < NUM_MESSAGES; ++i ) {
+        producer->send( outbound1.get(), cms::DeliveryMode::PERSISTENT, 4, 120*1000 );
+    }
+
+    // commit the sent messages
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->prepare( ixId.get() );
+    xaResource->commit( ixId.get(), false );
+
+    // start a new XA Transaction
+    ixId.reset( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+
+    for( std::size_t i = 0; i < NUM_MESSAGES; ++i ) {
+
+        auto_ptr<TextMessage> inbound1(
+            dynamic_cast<TextMessage*>( consumer->receive( 600000 ) ) );
+        CPPUNIT_ASSERT( inbound1.get() != NULL );
+        CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+    }
+
+    // commit the received messages
+    xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+    xaResource->prepare( ixId.get() );
+    xaResource->commit( ixId.get(), false );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testXAResource_Exception1() {
+
+    std::auto_ptr<XAConnectionFactory> factory(
+        XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+    CPPUNIT_ASSERT( factory.get() != NULL );
+
+    std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<XASession> session( connection->createXASession() );
+    CPPUNIT_ASSERT( session.get() != NULL );
+
+    std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+    XAResource* xaResource = session->getXAResource();
+    CPPUNIT_ASSERT( xaResource != NULL );
+
+    // start a new XA Transaction
+    std::auto_ptr<cms::Xid> ixId( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // prepare the sent messages without an end call.
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Prepare Should have thrown an XAException",
+        xaResource->prepare( ixId.get() ),
+        XAException );
+
+    xaResource->forget( ixId.get() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testXAResource_Exception2() {
+
+    std::auto_ptr<XAConnectionFactory> factory(
+        XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+    CPPUNIT_ASSERT( factory.get() != NULL );
+
+    std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<XASession> session( connection->createXASession() );
+    CPPUNIT_ASSERT( session.get() != NULL );
+
+    std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+    XAResource* xaResource = session->getXAResource();
+    CPPUNIT_ASSERT( xaResource != NULL );
+
+    // start a new XA Transaction
+    std::auto_ptr<cms::Xid> ixId( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // commit the sent messages without an end call.
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Commit Should have thrown an XAException",
+        xaResource->commit( ixId.get(), true ),
+        XAException );
+
+    xaResource->forget( ixId.get() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testXAResource_Exception3() {
+
+    std::auto_ptr<XAConnectionFactory> factory(
+        XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+    CPPUNIT_ASSERT( factory.get() != NULL );
+
+    std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    std::auto_ptr<XASession> session( connection->createXASession() );
+    CPPUNIT_ASSERT( session.get() != NULL );
+
+    std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+    std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+    XAResource* xaResource = session->getXAResource();
+    CPPUNIT_ASSERT( xaResource != NULL );
+
+    // start a new XA Transaction
+    std::auto_ptr<cms::Xid> ixId( this->createXid() );
+    std::auto_ptr<cms::Xid> ixIdOther( this->createXid() );
+    xaResource->start( ixId.get(), 0 );
+
+    // rollback the sent messages without an end call.
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "end Should have thrown an XAException",
+        xaResource->end( ixIdOther.get(), XAResource::TMSUSPEND ),
+        XAException );
+
+    xaResource->forget( ixId.get() );
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h?rev=1036656&r1=1036655&r2=1036656&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h Thu Nov 18 22:23:49 2010
@@ -18,22 +18,42 @@
 #ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREXATRANSACTIONSTEST_H_
 #define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREXATRANSACTIONSTEST_H_
 
-#include <activemq/test/CMSTestFixture.h>
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
 #include <activemq/util/IntegrationCommon.h>
+#include <activemq/util/IdGenerator.h>
+
+#include <cms/Xid.h>
 
 namespace activemq {
 namespace test {
 namespace openwire {
 
-    class OpenwireXATransactionsTest : public CMSTestFixture {
+    class OpenwireXATransactionsTest : public CppUnit::TestFixture {
 
         CPPUNIT_TEST_SUITE( OpenwireXATransactionsTest );
         CPPUNIT_TEST( testCreateXAConnectionFactory );
         CPPUNIT_TEST( testCreateXAConnection );
         CPPUNIT_TEST( testCreateXASession );
+        CPPUNIT_TEST( testGetXAResource );
+        CPPUNIT_TEST( testXAResource_Exception1 );
+        CPPUNIT_TEST( testXAResource_Exception2 );
+        CPPUNIT_TEST( testXAResource_Exception3 );
         CPPUNIT_TEST( testSendReceiveOutsideTX );
+        CPPUNIT_TEST( testSendReceiveTransactedBatches );
+        CPPUNIT_TEST( testSendRollback );
+        CPPUNIT_TEST( testWithTTLSet );
+        CPPUNIT_TEST( testSendRollbackCommitRollback );
         CPPUNIT_TEST_SUITE_END();
 
+    private:
+
+        static const int batchCount;
+        static const int batchSize;
+
+        util::IdGenerator txIdGen;
+
     public:
 
         OpenwireXATransactionsTest();
@@ -43,10 +63,25 @@ namespace openwire {
             return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
         }
 
+        virtual void setUp() {}
+        virtual void tearDown() {}
+
         void testCreateXAConnectionFactory();
         void testCreateXAConnection();
         void testCreateXASession();
+        void testGetXAResource();
         void testSendReceiveOutsideTX();
+        void testSendReceiveTransactedBatches();
+        void testSendRollback();
+        void testWithTTLSet();
+        void testSendRollbackCommitRollback();
+        void testXAResource_Exception1();
+        void testXAResource_Exception2();
+        void testXAResource_Exception3();
+
+    private:
+
+        cms::Xid* createXid() const;
 
     };