You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/11/17 15:21:19 UTC
svn commit: r1036054 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main: ./ activemq/core/
Author: tabish
Date: Wed Nov 17 14:21:19 2010
New Revision: 1036054
URL: http://svn.apache.org/viewvc?rev=1036054&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-329
Initial XA Support added.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Wed Nov 17 14:21:19 2010
@@ -94,6 +94,9 @@ cc_sources = \
activemq/core/ActiveMQSession.cpp \
activemq/core/ActiveMQSessionExecutor.cpp \
activemq/core/ActiveMQTransactionContext.cpp \
+ activemq/core/ActiveMQXAConnection.cpp \
+ activemq/core/ActiveMQXAConnectionFactory.cpp \
+ activemq/core/ActiveMQXASession.cpp \
activemq/core/FifoMessageDispatchChannel.cpp \
activemq/core/PrefetchPolicy.cpp \
activemq/core/RedeliveryPolicy.cpp \
@@ -261,6 +264,8 @@ cc_sources = \
cms/TemporaryTopic.cpp \
cms/TextMessage.cpp \
cms/Topic.cpp \
+ cms/TransactionInProgressException.cpp \
+ cms/TransactionRolledBackException.cpp \
cms/UnsupportedOperationException.cpp \
cms/XAConnection.cpp \
cms/XAConnectionFactory.cpp \
@@ -523,6 +528,9 @@ h_sources = \
activemq/core/ActiveMQSession.h \
activemq/core/ActiveMQSessionExecutor.h \
activemq/core/ActiveMQTransactionContext.h \
+ activemq/core/ActiveMQXAConnection.h \
+ activemq/core/ActiveMQXAConnectionFactory.h \
+ activemq/core/ActiveMQXASession.h \
activemq/core/DispatchData.h \
activemq/core/Dispatcher.h \
activemq/core/FifoMessageDispatchChannel.h \
@@ -717,6 +725,8 @@ h_sources = \
cms/TemporaryTopic.h \
cms/TextMessage.h \
cms/Topic.h \
+ cms/TransactionInProgressException.h \
+ cms/TransactionRolledBackException.h \
cms/UnsupportedOperationException.h \
cms/XAConnection.h \
cms/XAConnectionFactory.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Nov 17 14:21:19 2010
@@ -240,32 +240,34 @@ cms::Session* ActiveMQConnection::create
checkClosedOrFailed();
ensureConnectionInfoSent();
- // Create and initialize a new SessionInfo object
- Pointer<SessionInfo> sessionInfo( new SessionInfo() );
- decaf::lang::Pointer<SessionId> sessionId( new SessionId() );
- sessionId->setConnectionId( this->config->connectionInfo->getConnectionId()->getValue() );
- sessionId->setValue( this->config->sessionIds.getNextSequenceId() );
- sessionInfo->setSessionId( sessionId );
- sessionInfo->setAckMode( ackMode );
-
- // Send the subscription message to the broker.
- syncRequest( sessionInfo );
-
// Create the session instance.
ActiveMQSession* session = new ActiveMQSession(
- sessionInfo, ackMode, *this->config->properties, this );
+ this, getNextSessionId(), ackMode, *this->config->properties );
+
+ return session;
+ }
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<SessionId> ActiveMQConnection::getNextSessionId() {
+
+ decaf::lang::Pointer<SessionId> sessionId( new SessionId() );
+ sessionId->setConnectionId( this->config->connectionInfo->getConnectionId()->getValue() );
+ sessionId->setValue( this->config->sessionIds.getNextSequenceId() );
+
+ return sessionId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::addSession( ActiveMQSession* session ) {
- // Add the session to the set of active sessions.
+ try {
+
+ // Remove this session from the set of active sessions.
synchronized( &activeSessions ) {
activeSessions.add( session );
}
-
- // If we're already started, start the session.
- if( this->started.get() ) {
- session->start();
- }
-
- return session;
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -816,7 +818,7 @@ void ActiveMQConnection::oneway( Pointer
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::syncRequest( Pointer<Command> command, unsigned int timeout ) {
+Pointer<Response> ActiveMQConnection::syncRequest( Pointer<Command> command, unsigned int timeout ) {
try {
@@ -841,6 +843,8 @@ void ActiveMQConnection::syncRequest( Po
// Throw the exception.
throw exception;
}
+
+ return response;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( IOException, ActiveMQException )
@@ -1143,11 +1147,6 @@ void ActiveMQConnection::setProducerWind
}
////////////////////////////////////////////////////////////////////////////////
-long long ActiveMQConnection::getNextSessionId() {
- return this->config->sessionIds.getNextSequenceId();
-}
-
-////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConnection::getNextTempDestinationId() {
return this->config->tempDestinationIds.getNextSequenceId();
}
@@ -1190,3 +1189,8 @@ std::string ActiveMQConnection::getResou
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
+
+////////////////////////////////////////////////////////////////////////////////
+const decaf::util::Properties& ActiveMQConnection::getProperties() const {
+ return *( this->config->properties );
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Wed Nov 17 14:21:19 2010
@@ -25,7 +25,9 @@
#include <activemq/commands/ActiveMQTempDestination.h>
#include <activemq/commands/ConnectionInfo.h>
#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/SessionId.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/Transport.h>
#include <activemq/transport/TransportListener.h>
#include <decaf/util/Properties.h>
#include <decaf/util/StlMap.h>
@@ -140,6 +142,16 @@ namespace core{
virtual ~ActiveMQConnection() throw();
/**
+ * Adds the session resources for the given session instance.
+ *
+ * @param session
+ * The session to be added to this connection.
+ *
+ * @throws CMSException if an error occurs while removing performing the operation.
+ */
+ virtual void addSession( ActiveMQSession* session );
+
+ /**
* Removes the session resources for the given session instance.
*
* @param session
@@ -524,12 +536,6 @@ namespace core{
void setMessagePrioritySupported( bool value );
/**
- * Get the Next available Session Id.
- * @return the next id in the sequence.
- */
- long long getNextSessionId();
-
- /**
* Get the Next Temporary Destination Id
* @return the next id in the sequence.
*/
@@ -629,22 +635,32 @@ namespace core{
void cleanup();
/**
- * Sends a oneway message.
- * @param command The message to send.
- * @throws ConnectorException if not currently connected, or
- * if the operation fails for any reason.
+ * Sends a message without request that the broker send a response to indicate that
+ * it was received.
+ *
+ * @param command
+ * The Command object to send to the Broker.
+ *
+ * @throws ActiveMQException if not currently connected, or if the operation
+ * fails for any reason.
*/
void oneway( Pointer<commands::Command> command );
/**
- * Sends a synchronous request and returns the response from the broker.
- * Converts any error responses into an exception.
- * @param command The request command.
- * @param timeout The time to wait for a response, default is zero or infinite.
- * @throws ConnectorException thrown if an error response was received
- * from the broker, or if any other error occurred.
+ * Sends a synchronous request and returns the response from the broker. This
+ * method converts any error responses it receives into an exception.
+ *
+ * @param command
+ * The Command object that is to be sent to the broker.
+ * @param timeout
+ * The time in milliseconds to wait for a response, default is zero or infinite.
+ *
+ * @returns a Pointer instance to the Response object sent from the Broker.
+ *
+ * @throws BrokerException if the response from the broker is of type ExceptionResponse.
+ * @throws ActiveMQException if any other error occurs while sending the Command.
*/
- void syncRequest( Pointer<commands::Command> command, unsigned int timeout = 0 );
+ Pointer<commands::Response> syncRequest( Pointer<commands::Command> command, unsigned int timeout = 0 );
/**
* Notify the exception listener
@@ -673,26 +689,44 @@ namespace core{
*/
void onAsyncException( const decaf::lang::Exception& ex );
- private:
-
- // Sends a oneway disconnect message to the broker.
- void disconnect( long long lastDeliveredSequenceId );
-
- // Check for Closed State and Throw an exception if true.
+ /**
+ * Check for Closed State and Throw an exception if true.
+ *
+ * @throws CMSException if the Connection is closed.
+ */
void checkClosed() const;
- // Check for Closed State and Throw an exception if true.
+ /**
+ * Check for Closed State and Failed State and Throw an exception if either is true.
+ *
+ * @throws CMSException if the Connection is closed or failed.
+ */
void checkClosedOrFailed() const;
- // If its not been sent, then send the ConnectionInfo to the Broker.
+ /**
+ * If its not been sent, then send the ConnectionInfo to the Broker.
+ */
void ensureConnectionInfoSent();
+ protected:
+
+ /**
+ * @return the next available Session Id.
+ */
+ virtual Pointer<commands::SessionId> getNextSessionId();
+
+ // Sends a oneway disconnect message to the broker.
+ void disconnect( long long lastDeliveredSequenceId );
+
// Waits for all Consumers to handle the Transport Interrupted event.
void waitForTransportInterruptionProcessingToComplete();
// Marks processing complete for a single caller when interruption processing completes.
void signalInterruptionProcessingComplete();
+ // Allow subclasses to access the original Properties object for this connection.
+ const decaf::util::Properties& getProperties() const;
+
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Wed Nov 17 14:21:19 2010
@@ -174,11 +174,11 @@ ActiveMQConnectionFactory::ActiveMQConne
}
////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnectionFactory::ActiveMQConnectionFactory( const std::string& url,
+ActiveMQConnectionFactory::ActiveMQConnectionFactory( const std::string& uri,
const std::string& username,
const std::string& password ) : settings( new FactorySettings() ) {
- this->setBrokerURI( URI( url ) );
+ this->setBrokerURI( URI( uri ) );
// Store login data in the properties
if( !username.empty() ) {
@@ -272,7 +272,7 @@ cms::Connection* ActiveMQConnectionFacto
Pointer<Properties> properties( this->settings->properties->clone() );
// Create and Return the new connection object.
- connection.reset( new ActiveMQConnection( transport, properties ) );
+ connection.reset( createActiveMQConnection( transport, properties ) );
// Set all options parsed from the URI.
configureConnection( connection.get() );
@@ -304,6 +304,14 @@ cms::Connection* ActiveMQConnectionFacto
}
////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnection* ActiveMQConnectionFactory::createActiveMQConnection(
+ const Pointer<transport::Transport>& transport,
+ const Pointer<decaf::util::Properties>& properties ) {
+
+ return new ActiveMQConnection( transport, properties );
+}
+
+////////////////////////////////////////////////////////////////////////////////
cms::Connection* ActiveMQConnectionFactory::createConnection(
const std::string& uri,
const std::string& username,
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h Wed Nov 17 14:21:19 2010
@@ -22,11 +22,16 @@
#include <cms/ConnectionFactory.h>
#include <cms/Connection.h>
+#include <activemq/transport/Transport.h>
+
#include <decaf/net/URI.h>
+#include <decaf/util/Properties.h>
namespace activemq{
namespace core{
+ using decaf::lang::Pointer;
+
class ActiveMQConnection;
class FactorySettings;
class PrefetchPolicy;
@@ -49,11 +54,11 @@ namespace core{
/**
* Constructor
- * @param url the URL of the Broker we are connecting to.
+ * @param url the URI of the Broker we are connecting to.
* @param username to authenticate with, defaults to ""
* @param password to authenticate with, defaults to ""
*/
- ActiveMQConnectionFactory( const std::string& url,
+ ActiveMQConnectionFactory( const std::string& uri,
const std::string& username = "",
const std::string& password = "" );
@@ -382,6 +387,23 @@ namespace core{
const std::string& password,
const std::string& clientId = "" );
+ protected:
+
+ /**
+ * Create a new ActiveMQConnection instnace using the provided Transport and Properties.
+ * Subclasses can override this to control the actual type of ActiveMQConnection that
+ * is created.
+ *
+ * @param transport
+ * The Transport that the Connection should use to communicate with the Broker.
+ * @param properties
+ * The Properties that are assigned to the new Connection instance.
+ *
+ * @returns a new ActiveMQConnection pointer instance.
+ */
+ virtual ActiveMQConnection* createActiveMQConnection( const Pointer<transport::Transport>& transport,
+ const Pointer<decaf::util::Properties>& properties );
+
private:
cms::Connection* doCreateConnection( const decaf::net::URI& uri,
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Nov 17 14:21:19 2010
@@ -67,30 +67,40 @@ using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::ActiveMQSession( const Pointer<SessionInfo>& sessionInfo,
+ActiveMQSession::ActiveMQSession( ActiveMQConnection* connection,
+ const Pointer<SessionId>& id,
cms::Session::AcknowledgeMode ackMode,
- const Properties& properties,
- ActiveMQConnection* connection ) {
+ const Properties& properties ) {
- if( sessionInfo == NULL || connection == NULL ) {
+ if( id == NULL || connection == NULL ) {
throw ActiveMQException(
__FILE__, __LINE__,
- "ActiveMQSession::ActiveMQSession - Init with NULL data");
+ "ActiveMQSession::ActiveMQSession - Constructor called with NULL data");
}
- this->sessionInfo = sessionInfo;
+ this->sessionInfo.reset( new SessionInfo() );
+ this->sessionInfo->setAckMode( ackMode );
+ this->sessionInfo->setSessionId( id );
+
+ connection->oneway( this->sessionInfo );
+
this->connection = connection;
this->closed = false;
this->ackMode = ackMode;
this->lastDeliveredSequenceId = -1;
- // Create a Transaction object only if the session is transacted
- if( this->isTransacted() ) {
- this->transaction.reset( new ActiveMQTransactionContext( this, properties ) );
- }
+ // Create a Transaction objet
+ this->transaction.reset( new ActiveMQTransactionContext( this, properties ) );
// Create the session executor object.
this->executor.reset( new ActiveMQSessionExecutor( this ) );
+
+ this->connection->addSession( this );
+
+ // If the connection is already started, start the session.
+ if( this->connection->isStarted() ) {
+ this->start();
+ }
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Nov 17 14:21:19 2010
@@ -67,7 +67,7 @@ namespace core{
friend class ActiveMQSessionExecutor;
- private:
+ protected:
/**
* SessionInfo for this Session
@@ -136,10 +136,10 @@ namespace core{
public:
- ActiveMQSession( const Pointer<commands::SessionInfo>& sessionInfo,
+ ActiveMQSession( ActiveMQConnection* connection,
+ const Pointer<commands::SessionId>& id,
cms::Session::AcknowledgeMode ackMode,
- const decaf::util::Properties& properties,
- ActiveMQConnection* connection );
+ const decaf::util::Properties& properties );
virtual ~ActiveMQSession() throw();
@@ -147,7 +147,7 @@ namespace core{
* Redispatches the given set of unconsumed messages to the consumers.
* @param unconsumedMessages - unconsumed messages to be redelivered.
*/
- void redispatch( MessageDispatchChannel& unconsumedMessages );
+ virtual void redispatch( MessageDispatchChannel& unconsumedMessages );
/**
* Stops asynchronous message delivery.
@@ -165,19 +165,19 @@ namespace core{
*/
bool isStarted() const;
- bool isAutoAcknowledge() const {
+ virtual bool isAutoAcknowledge() const {
return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
}
- bool isDupsOkAcknowledge() const {
+ virtual bool isDupsOkAcknowledge() const {
return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
}
- bool isClientAcknowledge() const {
+ virtual bool isClientAcknowledge() const {
return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
}
- bool isIndividualAcknowledge() const {
+ virtual bool isIndividualAcknowledge() const {
return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
}
@@ -399,7 +399,7 @@ namespace core{
*
* @throw ActiveMQException if this is not a Transacted Session.
*/
- void doStartTransaction();
+ virtual void doStartTransaction();
/**
* Gets the Pointer to this Session's TransactionContext
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Wed Nov 17 14:21:19 2010
@@ -16,14 +16,24 @@
*/
#include "ActiveMQTransactionContext.h"
+#include <cms/Xid.h>
+#include <cms/XAException.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/commands/TransactionInfo.h>
+#include <activemq/commands/Response.h>
+#include <activemq/commands/IntegerResponse.h>
+#include <activemq/commands/DataArrayResponse.h>
+#include <activemq/commands/LocalTransactionId.h>
+#include <activemq/commands/XATransactionId.h>
+#include <activemq/util/CMSExceptionSupport.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Iterator.h>
+#include <decaf/util/StlList.h>
+#include <decaf/util/concurrent/ConcurrentStlMap.h>
using namespace std;
using namespace cms;
@@ -31,6 +41,7 @@ using namespace activemq;
using namespace activemq::core;
using namespace activemq::commands;
using namespace activemq::exceptions;
+using namespace activemq::util;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
@@ -38,8 +49,56 @@ using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+
+ class TxContextData {
+ public:
+
+ // Tracks local transactions
+ Pointer<commands::TransactionId> transactionId;
+
+ // To track XA transactions.
+ Pointer<Xid> associatedXid;
+ int beforeEndIndex;
+
+ // Global collection of all Ended XA Transactions.
+// static ConcurrentStlMap< Pointer<TransactionId>,
+// StlList<Synchronization*>,
+// TransactionId::COMPARATOR >* ENDED_XA_TRANSACTION_CONTEXTS;
+
+ TxContextData() {
+ }
+
+ };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class Finally {
+ private:
+
+ decaf::util::StlSet< Pointer<Synchronization> >* syncs;
+
+ public:
+
+ Finally( decaf::util::StlSet< Pointer<Synchronization> >* syncs ) : syncs( syncs ) {
+ }
+
+ ~Finally() {
+ if( this->syncs != NULL ) {
+ this->syncs->clear();
+ }
+ }
+ };
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
ActiveMQTransactionContext::ActiveMQTransactionContext( ActiveMQSession* session,
- const Properties& properties AMQCPP_UNUSED) {
+ const Properties& properties AMQCPP_UNUSED ) : context(NULL) {
try {
if( session == NULL ) {
@@ -49,6 +108,8 @@ ActiveMQTransactionContext::ActiveMQTran
"Initialized with a NULL session data");
}
+ this->context = new TxContextData();
+
// Store State Data
this->session = session;
this->connection = session->getConnection();
@@ -60,6 +121,10 @@ ActiveMQTransactionContext::ActiveMQTran
////////////////////////////////////////////////////////////////////////////////
ActiveMQTransactionContext::~ActiveMQTransactionContext() {
+ try{
+ delete this->context;
+ }
+ AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
@@ -85,7 +150,7 @@ void ActiveMQTransactionContext::begin()
if( !isInTransaction() ) {
- synchronized( &synchronizations ) {
+ synchronized( &this->synchronizations ) {
this->synchronizations.clear();
}
@@ -102,7 +167,7 @@ void ActiveMQTransactionContext::begin()
this->connection->oneway( transactionInfo );
- this->transactionId = id.dynamicCast<TransactionId>();
+ this->context->transactionId = id.dynamicCast<TransactionId>();
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -115,7 +180,7 @@ void ActiveMQTransactionContext::commit(
try{
- if( this->transactionId.get() == NULL ) {
+ if( this->context->transactionId.get() == NULL ) {
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQTransactionContext::commit - "
@@ -127,11 +192,11 @@ void ActiveMQTransactionContext::commit(
// Create and Populate the Info Command.
Pointer<TransactionInfo> info( new TransactionInfo() );
info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
- info->setTransactionId( this->transactionId );
+ info->setTransactionId( this->context->transactionId );
info->setType( ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
- // Before we send the command null the id in case of an exception.
- this->transactionId.reset( NULL );
+ // Before we send the command NULL the id in case of an exception.
+ this->context->transactionId.reset( NULL );
// Commit the current Transaction
this->connection->syncRequest( info );
@@ -148,7 +213,7 @@ void ActiveMQTransactionContext::rollbac
try{
- if( this->transactionId == NULL ) {
+ if( this->context->transactionId == NULL ) {
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQTransactionContext::rollback - "
@@ -160,11 +225,11 @@ void ActiveMQTransactionContext::rollbac
// Create and Populate the Info Command.
Pointer<TransactionInfo> info( new TransactionInfo() );
info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
- info->setTransactionId( this->transactionId );
+ info->setTransactionId( this->context->transactionId );
info->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
- // Before we send the command null the id in case of an exception.
- this->transactionId.reset( NULL );
+ // Before we send the command NULL the id in case of an exception.
+ this->context->transactionId.reset( NULL );
// Roll back the current Transaction
this->connection->syncRequest( info );
@@ -197,6 +262,8 @@ void ActiveMQTransactionContext::afterCo
// Notify each registered Synchronization that we committed this Transaction.
synchronized( &this->synchronizations ) {
+ Finally finalizer( &this->synchronizations );
+
std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
this->synchronizations.iterator() );
@@ -212,6 +279,8 @@ void ActiveMQTransactionContext::afterRo
// Notify each registered Synchronization that we rolled back this Transaction.
synchronized( &this->synchronizations ) {
+ Finally finalizer( &this->synchronizations );
+
std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
this->synchronizations.iterator() );
@@ -223,15 +292,510 @@ void ActiveMQTransactionContext::afterRo
////////////////////////////////////////////////////////////////////////////////
const Pointer<TransactionId>& ActiveMQTransactionContext::getTransactionId() const {
- if( this->transactionId == NULL ) {
+ if( this->context->transactionId == NULL ) {
throw decaf::lang::exceptions::InvalidStateException(
__FILE__, __LINE__, "Transaction Not Started." );
}
- return transactionId;
+ return this->context->transactionId;
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQTransactionContext::isInTransaction() const {
- return this->transactionId != NULL;
+ return this->context->transactionId != NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::isInLocalTransaction() const {
+ return this->context->transactionId != NULL && this->context->transactionId->isLocalTransactionId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::isInXATransaction() const {
+ return this->context->transactionId != NULL && this->context->transactionId->isXATransactionId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQTransactionContext::getTransactionTimeout() const {
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::setTransactionTimeout( int seconds AMQCPP_UNUSED ) {
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQTransactionContext::recover( int flag AMQCPP_UNUSED, Xid** recovered ) {
+
+ Pointer<TransactionInfo> info( new TransactionInfo() );
+ info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+ info->setType( ActiveMQConstants::TRANSACTION_STATE_RECOVER );
+
+ try {
+
+ this->connection->checkClosedOrFailed();
+ this->connection->ensureConnectionInfoSent();
+
+ Pointer<Response> response = this->connection->syncRequest( info );
+ Pointer<DataArrayResponse> arrayResponse = response.dynamicCast<DataArrayResponse>();
+
+ std::vector< Pointer<DataStructure> > array = arrayResponse->getData();
+
+ int size = (int)array.size();
+
+ if( size > 0 ) {
+
+ // Allocate space for all the recovered Xid's, if client passed us an existing
+ // array then this would leak, but they were warned, so just go with it.
+ recovered = new Xid*[array.size()];
+
+ // We need to clone each Xid and then add it to the array, the client is now
+ // responsible for freeing this memory.
+ for( int i = 0; i < size; ++i ) {
+ Pointer<XATransactionId> xid = array[i].dynamicCast<XATransactionId>();
+ recovered[i] = xid->clone();
+ }
+ }
+
+ return size;
+ } catch( Exception& e ) {
+ throw toXAException( e );
+ } catch( CMSException& e ) {
+ throw toXAException( e );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::start( const Xid* xid, int flags ) {
+
+ if( this->isInLocalTransaction() ) {
+ throw XAException( XAException::XAER_PROTO );
+ }
+
+ // Are we already associated?
+ if( this->context->associatedXid != NULL ) {
+ throw new XAException( XAException::XAER_PROTO );
+ }
+
+ const char* txSuspendResumeNotSupportMsg =
+ "The suspend/resume of a transaction " \
+ "is not supported. Instead it is recommended " \
+ "that a new JMS session be created.";
+
+ if( ( flags & TMJOIN ) == TMJOIN ) {
+ throw XAException( txSuspendResumeNotSupportMsg );
+ }
+ if( ( flags & TMRESUME ) == TMRESUME ) {
+ throw XAException( txSuspendResumeNotSupportMsg );
+ }
+
+ // prepare for a new association
+ this->synchronizations.clear();
+ this->context->beforeEndIndex = 0;
+
+ this->setXid( xid );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQTransactionContext::prepare( const Xid* xid ) {
+
+ // We allow interleaving multiple transactions, so we don't limit prepare to the associated xid.
+ Pointer<XATransactionId> x;
+
+ // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been called first
+ if( xid == NULL || equals( this->context->associatedXid.get(), xid ) ) {
+ throw XAException( XAException::XAER_PROTO );
+ } else {
+ x.reset( new XATransactionId( xid ) );
+ }
+
+ try {
+
+ Pointer<TransactionInfo> info( new TransactionInfo() );
+ info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+ info->setTransactionId( x );
+ info->setType( ActiveMQConstants::TRANSACTION_STATE_PREPARE );
+
+ // Find out if the server wants to commit or rollback.
+ Pointer<Response> response = this->connection->syncRequest( info );
+
+ Pointer<IntegerResponse> intResponse = response.dynamicCast<IntegerResponse>();
+
+ if( XAResource::XA_RDONLY == intResponse->getResult() ) {
+
+ // transaction stops now, may be syncs that need a callback
+// StlList<TransactionContext> l = this->context->ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+// if (l != NULL && !l.isEmpty()) {
+// for (TransactionContext ctx : l) {
+// ctx.afterCommit();
+// }
+// }
+
+ this->afterCommit();
+ }
+
+ return intResponse->getResult();
+
+ } catch( Exception& e ) {
+
+ try{
+ this->afterRollback();
+ } catch(...) {
+ }
+
+ throw toXAException( e );
+
+ } catch( CMSException& e ) {
+// List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+// if (l != NULL && !l.isEmpty()) {
+// for (TransactionContext ctx : l) {
+// try {
+// ctx.afterRollback();
+// } catch (Throwable ignored) {
+// }
+// }
+// }
+
+ try{
+ this->afterRollback();
+ } catch(...) {
+ }
+
+ throw toXAException( e );
+ }
+
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::commit( const Xid* xid, bool onePhase ) {
+
+ // We allow interleaving multiple transactions, so we don't limit prepare to the associated xid.
+ Pointer<XATransactionId> x;
+
+ // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been called first
+ if( xid == NULL || equals( this->context->associatedXid.get(), xid ) ) {
+ throw XAException( XAException::XAER_PROTO );
+ } else {
+ x.reset( new XATransactionId( xid ) );
+ }
+
+ try {
+
+ this->connection->checkClosedOrFailed();
+ this->connection->ensureConnectionInfoSent();
+
+ // Let the server know that the tx is rollback.
+ Pointer<TransactionInfo> info( new TransactionInfo() );
+ info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+ info->setTransactionId( x );
+ info->setType( onePhase ? ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE :
+ ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
+
+ this->connection->syncRequest( info );
+
+// List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+// if (l != NULL && !l.isEmpty()) {
+// for (TransactionContext ctx : l) {
+// ctx.afterCommit();
+// }
+// }
+
+ this->afterCommit();
+
+ } catch( Exception& ex ) {
+
+ try {
+ this->afterRollback();
+ } catch(...) {
+ }
+
+ throw toXAException( ex );
+
+ } catch( CMSException& e ) {
+// List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+// if (l != NULL && !l.isEmpty()) {
+// for (TransactionContext ctx : l) {
+// try {
+// ctx.afterRollback();
+// } catch(...) {
+// }
+// }
+// }
+
+ try {
+ this->afterRollback();
+ } catch(...) {
+ }
+
+ throw toXAException( e );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::rollback( const Xid* xid ) {
+
+ // We allow interleaving multiple transactions, so we don't limit prepare to the associated xid.
+ Pointer<XATransactionId> x;
+
+ if( xid == NULL ) {
+ throw XAException( XAException::XAER_PROTO );
+ }
+
+ if( equals( this->context->associatedXid.get(), xid ) ) {
+ x = this->context->transactionId.dynamicCast<XATransactionId>();
+ } else {
+ x.reset( new XATransactionId( xid ) );
+ }
+
+ try {
+
+ this->connection->checkClosedOrFailed();
+ this->connection->ensureConnectionInfoSent();
+
+ // Let the server know that the tx is rollback.
+ Pointer<TransactionInfo> info( new TransactionInfo() );
+ info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+ info->setTransactionId( x );
+ info->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
+
+ this->connection->syncRequest( info );
+
+// List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+// if (l != NULL && !l.isEmpty()) {
+// for (TransactionContext ctx : l) {
+// ctx.afterRollback();
+// }
+// }
+
+ this->afterRollback();
+
+ } catch( Exception& ex ) {
+ throw toXAException( ex );
+ } catch( CMSException& e ) {
+ throw toXAException( e );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::end( const Xid* xid, int flags ) {
+
+ if( isInLocalTransaction() ) {
+ throw XAException( XAException::XAER_PROTO );
+ }
+
+ if( ( flags & ( TMSUSPEND | TMFAIL ) ) != 0 ) {
+
+ // You can only suspend the associated xid.
+ if( !equals( this->context->associatedXid.get(), xid ) ) {
+ throw XAException( XAException::XAER_PROTO );
+ }
+
+ try {
+ this->beforeEnd();
+ } catch( Exception& e ) {
+ throw toXAException( e );
+ } catch( CMSException& e ) {
+ throw toXAException( e );
+ }
+
+ setXid( NULL );
+
+ } else if( ( flags & TMSUCCESS ) == TMSUCCESS ) {
+
+ // set to NULL if this is the current xid.
+ // otherwise this could be an asynchronous success call
+ if( equals( this->context->associatedXid.get(), xid ) ) {
+
+ try {
+ beforeEnd();
+ } catch( Exception& ex ) {
+ throw toXAException( ex );
+ } catch( CMSException& e ) {
+ throw toXAException( e );
+ }
+
+ setXid( NULL );
+ }
+ } else {
+ throw XAException( XAException::XAER_INVAL );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::forget( const Xid* xid ) {
+
+ // We allow interleaving multiple transactions, so we don't limit prepare to the associated xid.
+ Pointer<XATransactionId> x;
+
+ if( xid == NULL ) {
+ throw XAException( XAException::XAER_PROTO );
+ }
+
+ if( equals( this->context->associatedXid.get(), xid ) ) {
+ x = this->context->transactionId.dynamicCast<XATransactionId>();
+ } else {
+ x.reset( new XATransactionId( xid ) );
+ }
+
+ // Let the server know that the tx is rollback.
+ Pointer<TransactionInfo> info( new TransactionInfo() );
+ info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+ info->setTransactionId( x );
+ info->setType( ActiveMQConstants::TRANSACTION_STATE_FORGET );
+
+ try {
+ this->connection->syncRequest( info );
+ } catch( Exception& ex ) {
+ throw toXAException( ex );
+ } catch( CMSException& e ) {
+ throw toXAException( e );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::isSameRM( const XAResource* resource ) {
+
+ if( resource == NULL ) {
+ return false;
+ }
+
+ const ActiveMQTransactionContext* cntx =
+ dynamic_cast<const ActiveMQTransactionContext*>( resource );
+
+ if( cntx == NULL ) {
+ return false;
+ }
+
+ try{
+ return getResourceManagerId() == cntx->getResourceManagerId();
+ } catch( Exception& ex ) {
+ throw toXAException( ex );
+ } catch( CMSException& ex ) {
+ throw XAException( "Could not get the Resource Manager Id.", &ex );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::setXid( const Xid* xid ) {
+
+ try {
+ this->connection->checkClosedOrFailed();
+ this->connection->ensureConnectionInfoSent();
+ } catch( Exception& e ) {
+ throw toXAException( e );
+ } catch( CMSException& e ) {
+ throw toXAException( e );
+ }
+
+ if( xid != NULL ) {
+
+ // Associate this new Xid with this Transaction as the root of the TX.
+ this->context->associatedXid.reset( xid->clone() );
+ this->context->transactionId.reset( new XATransactionId( xid ) );
+
+ Pointer<TransactionInfo> info( new TransactionInfo() );
+ info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+ info->setTransactionId( this->context->transactionId );
+ info->setType( ActiveMQConstants::TRANSACTION_STATE_BEGIN );
+
+ try {
+ this->connection->oneway( info );
+ } catch( Exception& e ) {
+ throw toXAException( e );
+ } catch( CMSException& e ) {
+ throw toXAException( e );
+ }
+
+ } else {
+
+ if( this->context->transactionId != NULL ) {
+
+ Pointer<TransactionInfo> info( new TransactionInfo() );
+ info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+ info->setTransactionId( this->context->transactionId );
+ info->setType( ActiveMQConstants::TRANSACTION_STATE_END );
+
+ try {
+ this->connection->syncRequest( info );
+ } catch( CMSException& e ) {
+ throw toXAException( e );
+ }
+
+// // Add our self to the list of contexts that are interested in
+// // post commit/rollback events.
+// List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
+// if (l == NULL) {
+// l = new ArrayList<TransactionContext>(3);
+// ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
+// l.add(this);
+// } else if (!l.contains(this)) {
+// l.add(this);
+// }
+ }
+
+ // remove the association currently in place.
+ this->context->associatedXid.reset( NULL );
+ this->context->transactionId.reset( NULL );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::equals( const cms::Xid* local, const cms::Xid* remote ) {
+
+ if( local == remote ) {
+ return true;
+ }
+
+ if( ( local == NULL ) ^ ( remote == NULL ) ) {
+ return false;
+ }
+
+ if( local->getFormatId() != remote->getFormatId() ) {
+ return false;
+ } else {
+
+ std::vector<unsigned char> localBQual( Xid::MAXBQUALSIZE );
+ std::vector<unsigned char> remoteBQual( Xid::MAXBQUALSIZE );
+
+ local->getBranchQualifier( &localBQual[0], Xid::MAXBQUALSIZE );
+ remote->getBranchQualifier( &remoteBQual[0], Xid::MAXBQUALSIZE );
+
+ if( localBQual != remoteBQual ) {
+ return false;
+ }
+
+ std::vector<unsigned char> localGTXID( Xid::MAXBQUALSIZE );
+ std::vector<unsigned char> remoteGTXID( Xid::MAXBQUALSIZE );
+
+ local->getGlobalTransactionId( &localGTXID[0], Xid::MAXGTRIDSIZE );
+ remote->getGlobalTransactionId( &remoteGTXID[0], Xid::MAXGTRIDSIZE );
+
+ if( localGTXID != remoteGTXID ) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQTransactionContext::getResourceManagerId() const {
+ return this->connection->getResourceManagerId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+XAException ActiveMQTransactionContext::toXAException( decaf::lang::Exception& ex ) {
+ CMSException cmsEx = CMSExceptionSupport::create( ex );
+ XAException xae( ex.getMessage(), &cmsEx );
+ xae.setErrorCode( XAException::XAER_RMFAIL );
+ return xae;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+XAException ActiveMQTransactionContext::toXAException( cms::CMSException& ex ) {
+ XAException xae( ex.getMessage(), &ex );
+ xae.setErrorCode( XAException::XAER_RMFAIL );
+ return xae;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h Wed Nov 17 14:21:19 2010
@@ -20,6 +20,9 @@
#include <memory>
#include <cms/Message.h>
+#include <cms/XAResource.h>
+#include <cms/CMSException.h>
+#include <cms/XAException.h>
#include <activemq/util/Config.h>
#include <activemq/exceptions/ActiveMQException.h>
@@ -37,8 +40,10 @@ namespace core{
using decaf::lang::Pointer;
+ class LocalTransactionEventListener;
class ActiveMQSession;
class ActiveMQConnection;
+ class TxContextData;
/**
* Transaction Management class, hold messages that are to be redelivered
@@ -49,18 +54,18 @@ namespace core{
*
* @since 2.0
*/
- class AMQCPP_API ActiveMQTransactionContext {
+ class AMQCPP_API ActiveMQTransactionContext : public cms::XAResource {
private:
+ // Internal structure to hold all class TX data.
+ TxContextData* context;
+
// Session this Transaction is associated with
ActiveMQSession* session;
// The Connection that is the parent of the Session.
ActiveMQConnection* connection;
- // Transaction Info for the current Transaction
- Pointer<commands::TransactionId> transactionId;
-
// List of Registered Synchronizations
decaf::util::StlSet< Pointer<Synchronization> > synchronizations;
@@ -130,8 +135,52 @@ namespace core{
*/
virtual bool isInTransaction() const;
+ /**
+ * Checks to see if there is currently an Local Transaction in progess, returns
+ * false if not, true otherwise.
+ *
+ * @returns true if an Local Transaction is in progress.
+ */
+ virtual bool isInLocalTransaction() const;
+
+ /**
+ * Checks to see if there is currently an XA Transaction in progess, returns
+ * false if not, true otherwise.
+ *
+ * @returns true if an XA Transaction is in progress.
+ */
+ virtual bool isInXATransaction() const;
+
+ public: // XAResource implementation.
+
+ virtual void commit( const cms::Xid* xid, bool onePhase );
+
+ virtual void end( const cms::Xid* xid, int flags );
+
+ virtual void forget( const cms::Xid* xid );
+
+ virtual int getTransactionTimeout() const;
+
+ virtual bool isSameRM( const cms::XAResource* theXAResource );
+
+ virtual int prepare( const cms::Xid* xid );
+
+ virtual int recover(int flag, cms::Xid** recovered );
+
+ virtual void rollback( const cms::Xid* xid );
+
+ virtual bool setTransactionTimeout( int seconds );
+
+ virtual void start( const cms::Xid* xid, int flags );
+
private:
+ std::string getResourceManagerId() const;
+ void setXid( const cms::Xid* xid );
+ bool equals( const cms::Xid* local, const cms::Xid* remote );
+ cms::XAException toXAException( cms::CMSException& ex );
+ cms::XAException toXAException( decaf::lang::Exception& ex );
+
void beforeEnd();
void afterCommit();
void afterRollback();
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp Wed Nov 17 14:21:19 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQXAConnection.h"
+
+#include <activemq/core/ActiveMQXASession.h>
+#include <activemq/util/CMSExceptionSupport.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnection::ActiveMQXAConnection( const Pointer<transport::Transport>& transport,
+ const Pointer<decaf::util::Properties>& properties )
+ : ActiveMQConnection(transport, properties ) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnection::~ActiveMQXAConnection() throw() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XASession* ActiveMQXAConnection::createXASession() {
+ return dynamic_cast<cms::XASession*>( this->createSession( cms::Session::SESSION_TRANSACTED ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session* ActiveMQXAConnection::createSession( cms::Session::AcknowledgeMode ackMode AMQCPP_UNUSED ) {
+
+ try {
+
+ checkClosedOrFailed();
+ ensureConnectionInfoSent();
+
+ // Create the session instance.
+ cms::Session* session = new ActiveMQXASession(
+ this, getNextSessionId(), this->getProperties() );
+
+ return session;
+ }
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h Wed Nov 17 14:21:19 2010
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQXACONNECTION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQXACONNECTION_H_
+
+#include <activemq/util/Config.h>
+
+#include <cms/XAConnection.h>
+#include <activemq/core/ActiveMQConnection.h>
+
+namespace activemq {
+namespace core {
+
+ using decaf::lang::Pointer;
+
+ class AMQCPP_API ActiveMQXAConnection : public cms::XAConnection,
+ public ActiveMQConnection {
+ private:
+
+ ActiveMQXAConnection( const ActiveMQXAConnection& );
+ ActiveMQXAConnection& operator= ( const ActiveMQXAConnection& );
+
+ public:
+
+ ActiveMQXAConnection( const Pointer<transport::Transport>& transport,
+ const Pointer<decaf::util::Properties>& properties );
+
+ virtual ~ActiveMQXAConnection() throw();
+
+ virtual cms::XASession* createXASession();
+
+ virtual cms::Session* createSession( cms::Session::AcknowledgeMode ackMode );
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQXACONNECTION_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp Wed Nov 17 14:21:19 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQXAConnectionFactory.h"
+
+#include <activemq/core/ActiveMQXAConnection.h>
+
+using namespace activemq;
+using namespace activemq::core;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnectionFactory::ActiveMQXAConnectionFactory() :
+ ActiveMQConnectionFactory() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnectionFactory::ActiveMQXAConnectionFactory( const std::string& uri,
+ const std::string& username,
+ const std::string& password ) :
+ ActiveMQConnectionFactory( uri, username, password ) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnectionFactory::ActiveMQXAConnectionFactory( const decaf::net::URI& uri,
+ const std::string& username,
+ const std::string& password ) :
+ ActiveMQConnectionFactory( uri, username, password ) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnectionFactory::~ActiveMQXAConnectionFactory() throw() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XAConnection* ActiveMQXAConnectionFactory::createXAConnection() {
+ return dynamic_cast<cms::XAConnection*>( createConnection() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XAConnection* ActiveMQXAConnectionFactory::createXAConnection( const std::string& userName,
+ const std::string& password ) {
+ return dynamic_cast<cms::XAConnection*>( createConnection( userName, password ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnection* ActiveMQXAConnectionFactory::createActiveMQConnection(
+ const Pointer<transport::Transport>& transport,
+ const Pointer<decaf::util::Properties>& properties ) {
+
+ return new ActiveMQXAConnection( transport, properties );
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h Wed Nov 17 14:21:19 2010
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQXACONNECTIONFACTORY_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQXACONNECTIONFACTORY_H_
+
+#include <activemq/util/Config.h>
+
+#include <cms/XAConnectionFactory.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <decaf/net/URI.h>
+#include <decaf/util/Properties.h>
+
+namespace activemq {
+namespace core {
+
+ using decaf::lang::Pointer;
+
+ class AMQCPP_API ActiveMQXAConnectionFactory : public cms::XAConnectionFactory,
+ public ActiveMQConnectionFactory {
+ public:
+
+ ActiveMQXAConnectionFactory();
+
+ /**
+ * Constructor
+ * @param uri the URI of the Broker we are connecting to.
+ * @param username to authenticate with, defaults to ""
+ * @param password to authenticate with, defaults to ""
+ */
+ ActiveMQXAConnectionFactory( const std::string& uri,
+ const std::string& username = "",
+ const std::string& password = "" );
+
+ /**
+ * Constructor
+ * @param uri the URI of the Broker we are connecting to.
+ * @param username to authenticate with, defaults to ""
+ * @param password to authenticate with, defaults to ""
+ */
+ ActiveMQXAConnectionFactory( const decaf::net::URI& uri,
+ const std::string& username = "",
+ const std::string& password = "" );
+
+ virtual ~ActiveMQXAConnectionFactory() throw();
+
+ virtual cms::XAConnection* createXAConnection();
+
+ virtual cms::XAConnection* createXAConnection( const std::string& userName,
+ const std::string& password );
+
+ protected:
+
+ virtual ActiveMQConnection* createActiveMQConnection( const Pointer<transport::Transport>& transport,
+ const Pointer<decaf::util::Properties>& properties );
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQXACONNECTIONFACTORY_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp Wed Nov 17 14:21:19 2010
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQXASession.h"
+
+#include <cms/TransactionInProgressException.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXASession::ActiveMQXASession( ActiveMQConnection* connection,
+ const Pointer<commands::SessionId>& sessionId,
+ const decaf::util::Properties& properties ) :
+ ActiveMQSession( connection, sessionId, cms::Session::AUTO_ACKNOWLEDGE, properties ) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXASession::~ActiveMQXASession() throw() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQXASession::isTransacted() const {
+ return this->transaction->isInXATransaction();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQXASession::isAutoAcknowledge() const {
+ // Force this to always be true so the Session acts like an Auto Ack session
+ // when there is no active XA Transaction.
+ return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASession::doStartTransaction() {
+ // Controlled by the XAResource so this method is now a No-op.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASession::commit() {
+ throw cms::TransactionInProgressException("Cannot commit inside an XASession");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASession::rollback() {
+ throw cms::TransactionInProgressException("Cannot rollback inside an XASession");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XAResource* ActiveMQXASession::getXAResource() const {
+ return this->transaction.get();
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h Wed Nov 17 14:21:19 2010
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQXASESSION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQXASESSION_H_
+
+#include <activemq/util/Config.h>
+
+#include <cms/XASession.h>
+#include <activemq/core/ActiveMQSession.h>
+
+namespace activemq {
+namespace core {
+
+ using decaf::lang::Pointer;
+
+ class AMQCPP_API ActiveMQXASession : public cms::XASession,
+ public ActiveMQSession {
+ public:
+
+ ActiveMQXASession( ActiveMQConnection* connection,
+ const Pointer<commands::SessionId>& sessionId,
+ const decaf::util::Properties& properties );
+
+ virtual ~ActiveMQXASession() throw();
+
+ public: // Override ActiveMQSession methods to make them XA Aware
+
+ virtual bool isTransacted() const;
+
+ virtual bool isAutoAcknowledge() const;
+
+ virtual void doStartTransaction();
+
+ virtual void commit();
+
+ virtual void rollback();
+
+ public: // XASession overrides
+
+ virtual cms::XAResource* getXAResource() const;
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQXASESSION_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h
------------------------------------------------------------------------------
svn:eol-style = native