You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/11/18 23:23:49 UTC
svn commit: r1036656 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/core/ test-integration/activemq/test/openwire/
Author: tabish
Date: Thu Nov 18 22:23:49 2010
New Revision: 1036656
URL: http://svn.apache.org/viewvc?rev=1036656&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-329
More tests and an additonal fix.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=1036656&r1=1036655&r2=1036656&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Thu Nov 18 22:23:49 2010
@@ -303,11 +303,6 @@ void ActiveMQTransactionContext::afterRo
////////////////////////////////////////////////////////////////////////////////
const Pointer<TransactionId>& ActiveMQTransactionContext::getTransactionId() const {
- if( this->context->transactionId == NULL ) {
- throw decaf::lang::exceptions::InvalidStateException(
- __FILE__, __LINE__, "Transaction Not Started." );
- }
-
return this->context->transactionId;
}
@@ -590,6 +585,7 @@ void ActiveMQTransactionContext::end( co
setXid( NULL );
}
+
} else {
throw XAException( XAException::XAER_INVAL );
}
@@ -694,17 +690,6 @@ void ActiveMQTransactionContext::setXid(
} catch( CMSException& e ) {
throw toXAException( e );
}
-
-// // Add our self to the list of contexts that are interested in
-// // post commit/rollback events.
-// List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
-// if (l == NULL) {
-// l = new ArrayList<TransactionContext>(3);
-// ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
-// l.add(this);
-// } else if (!l.contains(this)) {
-// l.add(this);
-// }
}
// remove the association currently in place.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp?rev=1036656&r1=1036655&r2=1036656&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.cpp Thu Nov 18 22:23:49 2010
@@ -21,8 +21,10 @@
#include <activemq/core/ActiveMQXAConnection.h>
#include <activemq/core/ActiveMQXASession.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/commands/XATransactionId.h>
#include <decaf/lang/Thread.h>
+#include <decaf/util/UUID.h>
#include <cms/ConnectionFactory.h>
#include <cms/Connection.h>
@@ -41,12 +43,19 @@ using namespace cms;
using namespace std;
using namespace decaf;
using namespace decaf::lang;
+using namespace decaf::util;
using namespace activemq;
using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
using namespace activemq::test;
using namespace activemq::test::openwire;
////////////////////////////////////////////////////////////////////////////////
+const int OpenwireXATransactionsTest::batchCount = 10;
+const int OpenwireXATransactionsTest::batchSize = 20;
+
+////////////////////////////////////////////////////////////////////////////////
OpenwireXATransactionsTest::OpenwireXATransactionsTest() {
}
@@ -115,6 +124,26 @@ void OpenwireXATransactionsTest::testCre
}
////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testGetXAResource() {
+
+ std::auto_ptr<XAConnectionFactory> factory(
+ XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+ CPPUNIT_ASSERT( factory.get() != NULL );
+
+ std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<XASession> session( connection->createXASession() );
+ CPPUNIT_ASSERT( session.get() != NULL );
+
+ XAResource* xaResource = session->getXAResource();
+ CPPUNIT_ASSERT( xaResource != NULL );
+ CPPUNIT_ASSERT( xaResource->isSameRM( xaResource ) );
+ CPPUNIT_ASSERT_NO_THROW( xaResource->setTransactionTimeout( 10000 ) );
+ CPPUNIT_ASSERT( xaResource->getTransactionTimeout() == 0 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
void OpenwireXATransactionsTest::testSendReceiveOutsideTX() {
std::auto_ptr<XAConnectionFactory> factory(
@@ -127,20 +156,450 @@ void OpenwireXATransactionsTest::testSen
std::auto_ptr<XASession> session( connection->createXASession() );
CPPUNIT_ASSERT( session.get() != NULL );
+ ActiveMQXASession* amqXASession = dynamic_cast<ActiveMQXASession*>( session.get() );
+ CPPUNIT_ASSERT( amqXASession != NULL );
+
std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+ CPPUNIT_ASSERT( amqXASession->isAutoAcknowledge() == true );
+ CPPUNIT_ASSERT( amqXASession->isTransacted() == false );
+
connection->start();
for( int i = 0; i < 50; ++i ) {
- std::auto_ptr<Message> message( session->createTextMessage( "TEST" ) );
+ std::auto_ptr<cms::Message> message( session->createTextMessage( "TEST" ) );
producer->send( message.get() );
}
for( int i = 0; i < 50; ++i ) {
- std::auto_ptr<Message> message( consumer->receive( 3000 ) );
+ std::auto_ptr<cms::Message> message( consumer->receive( 3000 ) );
CPPUNIT_ASSERT( message.get() != NULL );
CPPUNIT_ASSERT( dynamic_cast<TextMessage*>( message.get() ) != NULL );
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Xid* OpenwireXATransactionsTest::createXid() const {
+
+ std::string branchQualStr = UUID::randomUUID().toString();
+ std::string globalTxIdStr = this->txIdGen.generateId();
+
+ std::vector<unsigned char> branchQual( branchQualStr.begin(), branchQualStr.end() );
+ std::vector<unsigned char> globalTxId( globalTxIdStr.begin(), globalTxIdStr.end() );
+
+ if( (int)branchQual.size() > Xid::MAXBQUALSIZE ) {
+ branchQual.resize( Xid::MAXBQUALSIZE );
+ }
+
+ if( (int)globalTxId.size() > Xid::MAXGTRIDSIZE ) {
+ globalTxId.resize( Xid::MAXGTRIDSIZE );
+ }
+
+ XATransactionId* id = new XATransactionId();
+
+ id->setFormatId( 0 );
+ id->setBranchQualifier( branchQual );
+ id->setGlobalTransactionId( globalTxId );
+
+ return id;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testSendReceiveTransactedBatches() {
+
+ std::auto_ptr<XAConnectionFactory> factory(
+ XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+ CPPUNIT_ASSERT( factory.get() != NULL );
+
+ std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<XASession> session( connection->createXASession() );
+ CPPUNIT_ASSERT( session.get() != NULL );
+
+ std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+
+ XAResource* xaResource = session->getXAResource();
+ CPPUNIT_ASSERT( xaResource != NULL );
+
+ connection->start();
+
+ for( int j = 0; j < batchCount; j++ ) {
+
+ std::auto_ptr<cms::Xid> txIdSend( this->createXid() );
+ xaResource->start( txIdSend.get(), 0 );
+
+ std::auto_ptr<TextMessage> message( session->createTextMessage( "Batch Message" ) );
+
+ for( int i = 0; i < batchSize; i++ ) {
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Send should not throw an exception here.",
+ producer->send( message.get() ) );
+ }
+
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Should not have thrown an Exception for xaResource->end",
+ xaResource->end( txIdSend.get(), XAResource::TMSUCCESS ) );
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Should not have thrown an Exception for xaResource->prepare",
+ xaResource->prepare( txIdSend.get() ) );
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Should not have thrown an Exception for xaResource->commit",
+ xaResource->commit( txIdSend.get(), false ) );
+
+ std::auto_ptr<cms::Xid> txIdRecv( this->createXid() );
+ xaResource->start( txIdRecv.get(), 0 );
+
+ for( int i = 0; i < batchSize; i++ ) {
+
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Receive Shouldn't throw a Message here:",
+ message.reset( dynamic_cast<TextMessage*>( consumer->receive( 1000 * 5 ) ) ) );
+
+ CPPUNIT_ASSERT_MESSAGE(
+ "Failed to receive all messages in batch", message.get() != NULL );
+ CPPUNIT_ASSERT( string("Batch Message") == message->getText() );
+ }
+
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Should not have thrown an Exception for xaResource->end",
+ xaResource->end( txIdRecv.get(), XAResource::TMSUCCESS ) );
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Should not have thrown an Exception for xaResource->prepare",
+ xaResource->prepare( txIdRecv.get() ) );
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Should not have thrown an Exception for xaResource->commit",
+ xaResource->commit( txIdRecv.get(), false ) );
+ }
+
+ std::auto_ptr<cms::Message> message;
+
+ CPPUNIT_ASSERT_NO_THROW_MESSAGE(
+ "Receive Shouldn't throw a Message here:",
+ message.reset( consumer->receive( 2000 ) ) );
+
+ CPPUNIT_ASSERT_MESSAGE(
+ "Unexpected Message Received after XA Batches all processed", message.get() == NULL );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testSendRollback() {
+
+ std::auto_ptr<XAConnectionFactory> factory(
+ XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+ CPPUNIT_ASSERT( factory.get() != NULL );
+
+ std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<XASession> session( connection->createXASession() );
+ CPPUNIT_ASSERT( session.get() != NULL );
+
+ std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+
+ XAResource* xaResource = session->getXAResource();
+ CPPUNIT_ASSERT( xaResource != NULL );
+
+ connection->start();
+
+ auto_ptr<TextMessage> outbound1( session->createTextMessage( "First Message" ) );
+ auto_ptr<TextMessage> outbound2( session->createTextMessage( "Second Message" ) );
+
+ // start a new XA Transaction
+ std::auto_ptr<cms::Xid> ixId( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // sends a message
+ producer->send( outbound1.get() );
+
+ // commit the sent message
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->prepare( ixId.get() );
+ xaResource->commit( ixId.get(), false );
+
+ // start a new XA Transaction
+ ixId.reset( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // sends a message that gets rollbacked
+ auto_ptr<cms::Message> rollback(
+ session->createTextMessage( "I'm going to get rolled back." ) );
+ producer->send( rollback.get() );
+
+ // Roll back the sent message
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->rollback( ixId.get() );
+
+ // start a new XA Transaction
+ ixId.reset( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // sends a message
+ producer->send( outbound2.get() );
+
+ // commit the sent message
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->prepare( ixId.get() );
+ xaResource->commit( ixId.get(), false );
+
+ // receives the first message
+ auto_ptr<TextMessage> inbound1(
+ dynamic_cast<TextMessage*>( consumer->receive( 1500 ) ) );
+
+ // receives the second message
+ auto_ptr<TextMessage> inbound2(
+ dynamic_cast<TextMessage*>( consumer->receive( 4000 ) ) );
+
+ CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+ CPPUNIT_ASSERT( outbound2->getText() == inbound2->getText() );
+
+ // Checks to make sure there's no other messages on the Destination.
+ CPPUNIT_ASSERT( consumer->receive( 3000 ) == NULL );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testSendRollbackCommitRollback() {
+
+ std::auto_ptr<XAConnectionFactory> factory(
+ XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+ CPPUNIT_ASSERT( factory.get() != NULL );
+
+ std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<XASession> session( connection->createXASession() );
+ CPPUNIT_ASSERT( session.get() != NULL );
+
+ std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+ XAResource* xaResource = session->getXAResource();
+ CPPUNIT_ASSERT( xaResource != NULL );
+
+ connection->start();
+
+ auto_ptr<TextMessage> outbound1( session->createTextMessage( "First Message" ) );
+ auto_ptr<TextMessage> outbound2( session->createTextMessage( "Second Message" ) );
+
+ // start a new XA Transaction
+ std::auto_ptr<cms::Xid> ixId( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // sends them and then rolls back.
+ producer->send( outbound1.get() );
+ producer->send( outbound2.get() );
+
+ // Roll back the sent message
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->rollback( ixId.get() );
+
+ // start a new XA Transaction
+ ixId.reset( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // Send one and commit.
+ producer->send( outbound1.get() );
+
+ // commit the sent message
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->prepare( ixId.get() );
+ xaResource->commit( ixId.get(), false );
+
+ // start a new XA Transaction
+ ixId.reset( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+
+ // receives the first message
+ auto_ptr<TextMessage> inbound1(
+ dynamic_cast<TextMessage*>( consumer->receive( 1500 ) ) );
+
+ CPPUNIT_ASSERT( NULL == consumer->receive( 1500 ) );
+ CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+
+ // Roll back the sent message
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->rollback( ixId.get() );
+
+ consumer->close();
+
+ // start a new XA Transaction
+ ixId.reset( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ consumer.reset( session->createConsumer( destination.get() ) );
+
+ inbound1.reset(
+ dynamic_cast<TextMessage*>( consumer->receive( 1500 ) ) );
+
+ CPPUNIT_ASSERT( NULL == consumer->receive( 1500 ) );
+ CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+
+ // commit the received message
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->prepare( ixId.get() );
+ xaResource->commit( ixId.get(), false );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testWithTTLSet() {
+
+ std::auto_ptr<XAConnectionFactory> factory(
+ XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+ CPPUNIT_ASSERT( factory.get() != NULL );
+
+ std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<XASession> session( connection->createXASession() );
+ CPPUNIT_ASSERT( session.get() != NULL );
+
+ std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+ XAResource* xaResource = session->getXAResource();
+ CPPUNIT_ASSERT( xaResource != NULL );
+
+ connection->start();
+
+ auto_ptr<TextMessage> outbound1( session->createTextMessage( "First Message" ) );
+
+ const std::size_t NUM_MESSAGES = 50;
+
+ // start a new XA Transaction
+ std::auto_ptr<cms::Xid> ixId( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // sends a message
+ for( std::size_t i = 0; i < NUM_MESSAGES; ++i ) {
+ producer->send( outbound1.get(), cms::DeliveryMode::PERSISTENT, 4, 120*1000 );
+ }
+
+ // commit the sent messages
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->prepare( ixId.get() );
+ xaResource->commit( ixId.get(), false );
+
+ // start a new XA Transaction
+ ixId.reset( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer( destination.get() ) );
+
+ for( std::size_t i = 0; i < NUM_MESSAGES; ++i ) {
+
+ auto_ptr<TextMessage> inbound1(
+ dynamic_cast<TextMessage*>( consumer->receive( 600000 ) ) );
+ CPPUNIT_ASSERT( inbound1.get() != NULL );
+ CPPUNIT_ASSERT( outbound1->getText() == inbound1->getText() );
+ }
+
+ // commit the received messages
+ xaResource->end( ixId.get(), XAResource::TMSUCCESS );
+ xaResource->prepare( ixId.get() );
+ xaResource->commit( ixId.get(), false );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testXAResource_Exception1() {
+
+ std::auto_ptr<XAConnectionFactory> factory(
+ XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+ CPPUNIT_ASSERT( factory.get() != NULL );
+
+ std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<XASession> session( connection->createXASession() );
+ CPPUNIT_ASSERT( session.get() != NULL );
+
+ std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+ XAResource* xaResource = session->getXAResource();
+ CPPUNIT_ASSERT( xaResource != NULL );
+
+ // start a new XA Transaction
+ std::auto_ptr<cms::Xid> ixId( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // prepare the sent messages without an end call.
+ CPPUNIT_ASSERT_THROW_MESSAGE(
+ "Prepare Should have thrown an XAException",
+ xaResource->prepare( ixId.get() ),
+ XAException );
+
+ xaResource->forget( ixId.get() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testXAResource_Exception2() {
+
+ std::auto_ptr<XAConnectionFactory> factory(
+ XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+ CPPUNIT_ASSERT( factory.get() != NULL );
+
+ std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<XASession> session( connection->createXASession() );
+ CPPUNIT_ASSERT( session.get() != NULL );
+
+ std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+ XAResource* xaResource = session->getXAResource();
+ CPPUNIT_ASSERT( xaResource != NULL );
+
+ // start a new XA Transaction
+ std::auto_ptr<cms::Xid> ixId( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // commit the sent messages without an end call.
+ CPPUNIT_ASSERT_THROW_MESSAGE(
+ "Commit Should have thrown an XAException",
+ xaResource->commit( ixId.get(), true ),
+ XAException );
+
+ xaResource->forget( ixId.get() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireXATransactionsTest::testXAResource_Exception3() {
+
+ std::auto_ptr<XAConnectionFactory> factory(
+ XAConnectionFactory::createCMSXAConnectionFactory( getBrokerURL() ) );
+ CPPUNIT_ASSERT( factory.get() != NULL );
+
+ std::auto_ptr<XAConnection> connection( factory->createXAConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ std::auto_ptr<XASession> session( connection->createXASession() );
+ CPPUNIT_ASSERT( session.get() != NULL );
+
+ std::auto_ptr<Destination> destination( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer( destination.get() ) );
+
+ XAResource* xaResource = session->getXAResource();
+ CPPUNIT_ASSERT( xaResource != NULL );
+
+ // start a new XA Transaction
+ std::auto_ptr<cms::Xid> ixId( this->createXid() );
+ std::auto_ptr<cms::Xid> ixIdOther( this->createXid() );
+ xaResource->start( ixId.get(), 0 );
+
+ // rollback the sent messages without an end call.
+ CPPUNIT_ASSERT_THROW_MESSAGE(
+ "end Should have thrown an XAException",
+ xaResource->end( ixIdOther.get(), XAResource::TMSUSPEND ),
+ XAException );
+
+ xaResource->forget( ixId.get() );
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h?rev=1036656&r1=1036655&r2=1036656&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireXATransactionsTest.h Thu Nov 18 22:23:49 2010
@@ -18,22 +18,42 @@
#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREXATRANSACTIONSTEST_H_
#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREXATRANSACTIONSTEST_H_
-#include <activemq/test/CMSTestFixture.h>
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
#include <activemq/util/IntegrationCommon.h>
+#include <activemq/util/IdGenerator.h>
+
+#include <cms/Xid.h>
namespace activemq {
namespace test {
namespace openwire {
- class OpenwireXATransactionsTest : public CMSTestFixture {
+ class OpenwireXATransactionsTest : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE( OpenwireXATransactionsTest );
CPPUNIT_TEST( testCreateXAConnectionFactory );
CPPUNIT_TEST( testCreateXAConnection );
CPPUNIT_TEST( testCreateXASession );
+ CPPUNIT_TEST( testGetXAResource );
+ CPPUNIT_TEST( testXAResource_Exception1 );
+ CPPUNIT_TEST( testXAResource_Exception2 );
+ CPPUNIT_TEST( testXAResource_Exception3 );
CPPUNIT_TEST( testSendReceiveOutsideTX );
+ CPPUNIT_TEST( testSendReceiveTransactedBatches );
+ CPPUNIT_TEST( testSendRollback );
+ CPPUNIT_TEST( testWithTTLSet );
+ CPPUNIT_TEST( testSendRollbackCommitRollback );
CPPUNIT_TEST_SUITE_END();
+ private:
+
+ static const int batchCount;
+ static const int batchSize;
+
+ util::IdGenerator txIdGen;
+
public:
OpenwireXATransactionsTest();
@@ -43,10 +63,25 @@ namespace openwire {
return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
}
+ virtual void setUp() {}
+ virtual void tearDown() {}
+
void testCreateXAConnectionFactory();
void testCreateXAConnection();
void testCreateXASession();
+ void testGetXAResource();
void testSendReceiveOutsideTX();
+ void testSendReceiveTransactedBatches();
+ void testSendRollback();
+ void testWithTTLSet();
+ void testSendRollbackCommitRollback();
+ void testXAResource_Exception1();
+ void testXAResource_Exception2();
+ void testXAResource_Exception3();
+
+ private:
+
+ cms::Xid* createXid() const;
};