You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/11/17 15:21:19 UTC

svn commit: r1036054 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: ./ activemq/core/

Author: tabish
Date: Wed Nov 17 14:21:19 2010
New Revision: 1036054

URL: http://svn.apache.org/viewvc?rev=1036054&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-329

Initial XA Support added.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Wed Nov 17 14:21:19 2010
@@ -94,6 +94,9 @@ cc_sources = \
     activemq/core/ActiveMQSession.cpp \
     activemq/core/ActiveMQSessionExecutor.cpp \
     activemq/core/ActiveMQTransactionContext.cpp \
+    activemq/core/ActiveMQXAConnection.cpp \
+    activemq/core/ActiveMQXAConnectionFactory.cpp \
+    activemq/core/ActiveMQXASession.cpp \
     activemq/core/FifoMessageDispatchChannel.cpp \
     activemq/core/PrefetchPolicy.cpp \
     activemq/core/RedeliveryPolicy.cpp \
@@ -261,6 +264,8 @@ cc_sources = \
     cms/TemporaryTopic.cpp \
     cms/TextMessage.cpp \
     cms/Topic.cpp \
+    cms/TransactionInProgressException.cpp \
+    cms/TransactionRolledBackException.cpp \
     cms/UnsupportedOperationException.cpp \
     cms/XAConnection.cpp \
     cms/XAConnectionFactory.cpp \
@@ -523,6 +528,9 @@ h_sources = \
     activemq/core/ActiveMQSession.h \
     activemq/core/ActiveMQSessionExecutor.h \
     activemq/core/ActiveMQTransactionContext.h \
+    activemq/core/ActiveMQXAConnection.h \
+    activemq/core/ActiveMQXAConnectionFactory.h \
+    activemq/core/ActiveMQXASession.h \
     activemq/core/DispatchData.h \
     activemq/core/Dispatcher.h \
     activemq/core/FifoMessageDispatchChannel.h \
@@ -717,6 +725,8 @@ h_sources = \
     cms/TemporaryTopic.h \
     cms/TextMessage.h \
     cms/Topic.h \
+    cms/TransactionInProgressException.h \
+    cms/TransactionRolledBackException.h \
     cms/UnsupportedOperationException.h \
     cms/XAConnection.h \
     cms/XAConnectionFactory.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Nov 17 14:21:19 2010
@@ -240,32 +240,34 @@ cms::Session* ActiveMQConnection::create
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        // Create and initialize a new SessionInfo object
-        Pointer<SessionInfo> sessionInfo( new SessionInfo() );
-        decaf::lang::Pointer<SessionId> sessionId( new SessionId() );
-        sessionId->setConnectionId( this->config->connectionInfo->getConnectionId()->getValue() );
-        sessionId->setValue( this->config->sessionIds.getNextSequenceId() );
-        sessionInfo->setSessionId( sessionId );
-        sessionInfo->setAckMode( ackMode );
-
-        // Send the subscription message to the broker.
-        syncRequest( sessionInfo );
-
         // Create the session instance.
         ActiveMQSession* session = new ActiveMQSession(
-            sessionInfo, ackMode, *this->config->properties, this );
+            this, getNextSessionId(), ackMode, *this->config->properties );
+
+        return session;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<SessionId> ActiveMQConnection::getNextSessionId() {
+
+    decaf::lang::Pointer<SessionId> sessionId( new SessionId() );
+    sessionId->setConnectionId( this->config->connectionInfo->getConnectionId()->getValue() );
+    sessionId->setValue( this->config->sessionIds.getNextSequenceId() );
+
+    return sessionId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::addSession( ActiveMQSession* session ) {
 
-        // Add the session to the set of active sessions.
+    try {
+
+        // Remove this session from the set of active sessions.
         synchronized( &activeSessions ) {
             activeSessions.add( session );
         }
-
-        // If we're already started, start the session.
-        if( this->started.get() ) {
-            session->start();
-        }
-
-        return session;
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -816,7 +818,7 @@ void ActiveMQConnection::oneway( Pointer
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::syncRequest( Pointer<Command> command, unsigned int timeout ) {
+Pointer<Response> ActiveMQConnection::syncRequest( Pointer<Command> command, unsigned int timeout ) {
 
     try {
 
@@ -841,6 +843,8 @@ void ActiveMQConnection::syncRequest( Po
             // Throw the exception.
             throw exception;
         }
+
+        return response;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( IOException, ActiveMQException )
@@ -1143,11 +1147,6 @@ void ActiveMQConnection::setProducerWind
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-long long ActiveMQConnection::getNextSessionId() {
-    return this->config->sessionIds.getNextSequenceId();
-}
-
-////////////////////////////////////////////////////////////////////////////////
 long long ActiveMQConnection::getNextTempDestinationId() {
     return this->config->tempDestinationIds.getNextSequenceId();
 }
@@ -1190,3 +1189,8 @@ std::string ActiveMQConnection::getResou
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
+
+////////////////////////////////////////////////////////////////////////////////
+const decaf::util::Properties& ActiveMQConnection::getProperties() const {
+    return *( this->config->properties );
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Wed Nov 17 14:21:19 2010
@@ -25,7 +25,9 @@
 #include <activemq/commands/ActiveMQTempDestination.h>
 #include <activemq/commands/ConnectionInfo.h>
 #include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/SessionId.h>
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/Transport.h>
 #include <activemq/transport/TransportListener.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/StlMap.h>
@@ -140,6 +142,16 @@ namespace core{
         virtual ~ActiveMQConnection() throw();
 
         /**
+         * Adds the session resources for the given session instance.
+         *
+         * @param session
+         *      The session to be added to this connection.
+         *
+         * @throws CMSException if an error occurs while removing performing the operation.
+         */
+        virtual void addSession( ActiveMQSession* session );
+
+        /**
          * Removes the session resources for the given session instance.
          *
          * @param session
@@ -524,12 +536,6 @@ namespace core{
         void setMessagePrioritySupported( bool value );
 
         /**
-         * Get the Next available Session Id.
-         * @return the next id in the sequence.
-         */
-        long long getNextSessionId();
-
-        /**
          * Get the Next Temporary Destination Id
          * @return the next id in the sequence.
          */
@@ -629,22 +635,32 @@ namespace core{
         void cleanup();
 
         /**
-         * Sends a oneway message.
-         * @param command The message to send.
-         * @throws ConnectorException if not currently connected, or
-         * if the operation fails for any reason.
+         * Sends a message without request that the broker send a response to indicate that
+         * it was received.
+         *
+         * @param command
+         *      The Command object to send to the Broker.
+         *
+         * @throws ActiveMQException if not currently connected, or if the operation
+         *         fails for any reason.
          */
         void oneway( Pointer<commands::Command> command );
 
         /**
-         * Sends a synchronous request and returns the response from the broker.
-         * Converts any error responses into an exception.
-         * @param command The request command.
-         * @param timeout The time to wait for a response, default is zero or infinite.
-         * @throws ConnectorException thrown if an error response was received
-         * from the broker, or if any other error occurred.
+         * Sends a synchronous request and returns the response from the broker.  This
+         * method converts any error responses it receives into an exception.
+         *
+         * @param command
+         *      The Command object that is to be sent to the broker.
+         * @param timeout
+         *      The time in milliseconds to wait for a response, default is zero or infinite.
+         *
+         * @returns a Pointer instance to the Response object sent from the Broker.
+         *
+         * @throws BrokerException if the response from the broker is of type ExceptionResponse.
+         * @throws ActiveMQException if any other error occurs while sending the Command.
          */
-        void syncRequest( Pointer<commands::Command> command, unsigned int timeout = 0 );
+        Pointer<commands::Response> syncRequest( Pointer<commands::Command> command, unsigned int timeout = 0 );
 
         /**
          * Notify the exception listener
@@ -673,26 +689,44 @@ namespace core{
          */
         void onAsyncException( const decaf::lang::Exception& ex );
 
-    private:
-
-        // Sends a oneway disconnect message to the broker.
-        void disconnect( long long lastDeliveredSequenceId );
-
-        // Check for Closed State and Throw an exception if true.
+        /**
+         * Check for Closed State and Throw an exception if true.
+         *
+         * @throws CMSException if the Connection is closed.
+         */
         void checkClosed() const;
 
-        // Check for Closed State and Throw an exception if true.
+        /**
+         * Check for Closed State and Failed State and Throw an exception if either is true.
+         *
+         * @throws CMSException if the Connection is closed or failed.
+         */
         void checkClosedOrFailed() const;
 
-        // If its not been sent, then send the ConnectionInfo to the Broker.
+        /**
+         * If its not been sent, then send the ConnectionInfo to the Broker.
+         */
         void ensureConnectionInfoSent();
 
+    protected:
+
+        /**
+         * @return the next available Session Id.
+         */
+        virtual Pointer<commands::SessionId> getNextSessionId();
+
+        // Sends a oneway disconnect message to the broker.
+        void disconnect( long long lastDeliveredSequenceId );
+
         // Waits for all Consumers to handle the Transport Interrupted event.
         void waitForTransportInterruptionProcessingToComplete();
 
         // Marks processing complete for a single caller when interruption processing completes.
         void signalInterruptionProcessingComplete();
 
+        // Allow subclasses to access the original Properties object for this connection.
+        const decaf::util::Properties& getProperties() const;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Wed Nov 17 14:21:19 2010
@@ -174,11 +174,11 @@ ActiveMQConnectionFactory::ActiveMQConne
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnectionFactory::ActiveMQConnectionFactory( const std::string& url,
+ActiveMQConnectionFactory::ActiveMQConnectionFactory( const std::string& uri,
                                                       const std::string& username,
                                                       const std::string& password ) : settings( new FactorySettings() ) {
 
-    this->setBrokerURI( URI( url ) );
+    this->setBrokerURI( URI( uri ) );
 
     // Store login data in the properties
     if( !username.empty() ) {
@@ -272,7 +272,7 @@ cms::Connection* ActiveMQConnectionFacto
         Pointer<Properties> properties( this->settings->properties->clone() );
 
         // Create and Return the new connection object.
-        connection.reset( new ActiveMQConnection( transport, properties ) );
+        connection.reset( createActiveMQConnection( transport, properties ) );
 
         // Set all options parsed from the URI.
         configureConnection( connection.get() );
@@ -304,6 +304,14 @@ cms::Connection* ActiveMQConnectionFacto
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnection* ActiveMQConnectionFactory::createActiveMQConnection(
+    const Pointer<transport::Transport>& transport,
+    const Pointer<decaf::util::Properties>& properties ) {
+
+    return new ActiveMQConnection( transport, properties );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 cms::Connection* ActiveMQConnectionFactory::createConnection(
     const std::string& uri,
     const std::string& username,

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h Wed Nov 17 14:21:19 2010
@@ -22,11 +22,16 @@
 #include <cms/ConnectionFactory.h>
 #include <cms/Connection.h>
 
+#include <activemq/transport/Transport.h>
+
 #include <decaf/net/URI.h>
+#include <decaf/util/Properties.h>
 
 namespace activemq{
 namespace core{
 
+    using decaf::lang::Pointer;
+
     class ActiveMQConnection;
     class FactorySettings;
     class PrefetchPolicy;
@@ -49,11 +54,11 @@ namespace core{
 
         /**
          * Constructor
-         * @param url the URL of the Broker we are connecting to.
+         * @param url the URI of the Broker we are connecting to.
          * @param username to authenticate with, defaults to ""
          * @param password to authenticate with, defaults to ""
          */
-        ActiveMQConnectionFactory( const std::string& url,
+        ActiveMQConnectionFactory( const std::string& uri,
                                    const std::string& username = "",
                                    const std::string& password = "" );
 
@@ -382,6 +387,23 @@ namespace core{
                                                   const std::string& password,
                                                   const std::string& clientId = "" );
 
+    protected:
+
+        /**
+         * Create a new ActiveMQConnection instnace using the provided Transport and Properties.
+         * Subclasses can override this to control the actual type of ActiveMQConnection that
+         * is created.
+         *
+         * @param transport
+         *      The Transport that the Connection should use to communicate with the Broker.
+         * @param properties
+         *      The Properties that are assigned to the new Connection instance.
+         *
+         * @returns a new ActiveMQConnection pointer instance.
+         */
+        virtual ActiveMQConnection* createActiveMQConnection( const Pointer<transport::Transport>& transport,
+                                                              const Pointer<decaf::util::Properties>& properties );
+
     private:
 
         cms::Connection* doCreateConnection( const decaf::net::URI& uri,

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Nov 17 14:21:19 2010
@@ -67,30 +67,40 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::ActiveMQSession( const Pointer<SessionInfo>& sessionInfo,
+ActiveMQSession::ActiveMQSession( ActiveMQConnection* connection,
+                                  const Pointer<SessionId>& id,
                                   cms::Session::AcknowledgeMode ackMode,
-                                  const Properties& properties,
-                                  ActiveMQConnection* connection ) {
+                                  const Properties& properties ) {
 
-    if( sessionInfo == NULL || connection == NULL ) {
+    if( id == NULL || connection == NULL ) {
         throw ActiveMQException(
             __FILE__, __LINE__,
-            "ActiveMQSession::ActiveMQSession - Init with NULL data");
+            "ActiveMQSession::ActiveMQSession - Constructor called with NULL data");
     }
 
-    this->sessionInfo = sessionInfo;
+    this->sessionInfo.reset( new SessionInfo() );
+    this->sessionInfo->setAckMode( ackMode );
+    this->sessionInfo->setSessionId( id );
+
+    connection->oneway( this->sessionInfo );
+
     this->connection = connection;
     this->closed = false;
     this->ackMode = ackMode;
     this->lastDeliveredSequenceId = -1;
 
-    // Create a Transaction object only if the session is transacted
-    if( this->isTransacted() ) {
-        this->transaction.reset( new ActiveMQTransactionContext( this, properties ) );
-    }
+    // Create a Transaction objet
+    this->transaction.reset( new ActiveMQTransactionContext( this, properties ) );
 
     // Create the session executor object.
     this->executor.reset( new ActiveMQSessionExecutor( this ) );
+
+    this->connection->addSession( this );
+
+    // If the connection is already started, start the session.
+    if( this->connection->isStarted() ) {
+        this->start();
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Nov 17 14:21:19 2010
@@ -67,7 +67,7 @@ namespace core{
 
         friend class ActiveMQSessionExecutor;
 
-    private:
+    protected:
 
         /**
          * SessionInfo for this Session
@@ -136,10 +136,10 @@ namespace core{
 
     public:
 
-        ActiveMQSession( const Pointer<commands::SessionInfo>& sessionInfo,
+        ActiveMQSession( ActiveMQConnection* connection,
+                         const Pointer<commands::SessionId>& id,
                          cms::Session::AcknowledgeMode ackMode,
-                         const decaf::util::Properties& properties,
-                         ActiveMQConnection* connection );
+                         const decaf::util::Properties& properties );
 
         virtual ~ActiveMQSession() throw();
 
@@ -147,7 +147,7 @@ namespace core{
          * Redispatches the given set of unconsumed messages to the consumers.
          * @param unconsumedMessages - unconsumed messages to be redelivered.
          */
-        void redispatch( MessageDispatchChannel& unconsumedMessages );
+        virtual void redispatch( MessageDispatchChannel& unconsumedMessages );
 
         /**
          * Stops asynchronous message delivery.
@@ -165,19 +165,19 @@ namespace core{
          */
         bool isStarted() const;
 
-        bool isAutoAcknowledge() const {
+        virtual bool isAutoAcknowledge() const {
             return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
         }
 
-        bool isDupsOkAcknowledge() const {
+        virtual bool isDupsOkAcknowledge() const {
             return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
         }
 
-        bool isClientAcknowledge() const {
+        virtual bool isClientAcknowledge() const {
             return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
         }
 
-        bool isIndividualAcknowledge() const {
+        virtual bool isIndividualAcknowledge() const {
             return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
         }
 
@@ -399,7 +399,7 @@ namespace core{
          *
          * @throw ActiveMQException if this is not a Transacted Session.
          */
-        void doStartTransaction();
+        virtual void doStartTransaction();
 
         /**
          * Gets the Pointer to this Session's TransactionContext

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Wed Nov 17 14:21:19 2010
@@ -16,14 +16,24 @@
  */
 #include "ActiveMQTransactionContext.h"
 
+#include <cms/Xid.h>
+#include <cms/XAException.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/commands/TransactionInfo.h>
+#include <activemq/commands/Response.h>
+#include <activemq/commands/IntegerResponse.h>
+#include <activemq/commands/DataArrayResponse.h>
+#include <activemq/commands/LocalTransactionId.h>
+#include <activemq/commands/XATransactionId.h>
+#include <activemq/util/CMSExceptionSupport.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/Integer.h>
 #include <decaf/lang/Long.h>
 #include <decaf/util/Iterator.h>
+#include <decaf/util/StlList.h>
+#include <decaf/util/concurrent/ConcurrentStlMap.h>
 
 using namespace std;
 using namespace cms;
@@ -31,6 +41,7 @@ using namespace activemq;
 using namespace activemq::core;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
+using namespace activemq::util;
 using namespace decaf;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
@@ -38,8 +49,56 @@ using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+
+    class TxContextData {
+    public:
+
+        // Tracks local transactions
+        Pointer<commands::TransactionId> transactionId;
+
+        // To track XA transactions.
+        Pointer<Xid> associatedXid;
+        int beforeEndIndex;
+
+        // Global collection of all Ended XA Transactions.
+//        static ConcurrentStlMap< Pointer<TransactionId>,
+//                                 StlList<Synchronization*>,
+//                                 TransactionId::COMPARATOR >* ENDED_XA_TRANSACTION_CONTEXTS;
+
+        TxContextData() {
+        }
+
+    };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class Finally {
+    private:
+
+        decaf::util::StlSet< Pointer<Synchronization> >* syncs;
+
+    public:
+
+        Finally( decaf::util::StlSet< Pointer<Synchronization> >* syncs ) : syncs( syncs ) {
+        }
+
+        ~Finally() {
+            if( this->syncs != NULL ) {
+                this->syncs->clear();
+            }
+        }
+    };
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
 ActiveMQTransactionContext::ActiveMQTransactionContext( ActiveMQSession* session,
-                                                        const Properties& properties AMQCPP_UNUSED) {
+                                                        const Properties& properties AMQCPP_UNUSED ) : context(NULL) {
     try {
 
         if( session == NULL ) {
@@ -49,6 +108,8 @@ ActiveMQTransactionContext::ActiveMQTran
                 "Initialized with a NULL session data");
         }
 
+        this->context = new TxContextData();
+
         // Store State Data
         this->session = session;
         this->connection = session->getConnection();
@@ -60,6 +121,10 @@ ActiveMQTransactionContext::ActiveMQTran
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQTransactionContext::~ActiveMQTransactionContext() {
+    try{
+        delete this->context;
+    }
+    AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -85,7 +150,7 @@ void ActiveMQTransactionContext::begin()
 
         if( !isInTransaction() ) {
 
-            synchronized( &synchronizations ) {
+            synchronized( &this->synchronizations ) {
                 this->synchronizations.clear();
             }
 
@@ -102,7 +167,7 @@ void ActiveMQTransactionContext::begin()
 
             this->connection->oneway( transactionInfo );
 
-            this->transactionId = id.dynamicCast<TransactionId>();
+            this->context->transactionId = id.dynamicCast<TransactionId>();
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -115,7 +180,7 @@ void ActiveMQTransactionContext::commit(
 
     try{
 
-        if( this->transactionId.get() == NULL ) {
+        if( this->context->transactionId.get() == NULL ) {
             throw InvalidStateException(
                 __FILE__, __LINE__,
                 "ActiveMQTransactionContext::commit - "
@@ -127,11 +192,11 @@ void ActiveMQTransactionContext::commit(
         // Create and Populate the Info Command.
         Pointer<TransactionInfo> info( new TransactionInfo() );
         info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        info->setTransactionId( this->transactionId );
+        info->setTransactionId( this->context->transactionId );
         info->setType( ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
 
-        // Before we send the command null the id in case of an exception.
-        this->transactionId.reset( NULL );
+        // Before we send the command NULL the id in case of an exception.
+        this->context->transactionId.reset( NULL );
 
         // Commit the current Transaction
         this->connection->syncRequest( info );
@@ -148,7 +213,7 @@ void ActiveMQTransactionContext::rollbac
 
     try{
 
-        if( this->transactionId == NULL ) {
+        if( this->context->transactionId == NULL ) {
             throw InvalidStateException(
                 __FILE__, __LINE__,
                 "ActiveMQTransactionContext::rollback - "
@@ -160,11 +225,11 @@ void ActiveMQTransactionContext::rollbac
         // Create and Populate the Info Command.
         Pointer<TransactionInfo> info( new TransactionInfo() );
         info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        info->setTransactionId( this->transactionId );
+        info->setTransactionId( this->context->transactionId );
         info->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
 
-        // Before we send the command null the id in case of an exception.
-        this->transactionId.reset( NULL );
+        // Before we send the command NULL the id in case of an exception.
+        this->context->transactionId.reset( NULL );
 
         // Roll back the current Transaction
         this->connection->syncRequest( info );
@@ -197,6 +262,8 @@ void ActiveMQTransactionContext::afterCo
     // Notify each registered Synchronization that we committed this Transaction.
     synchronized( &this->synchronizations ) {
 
+        Finally finalizer( &this->synchronizations );
+
         std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
             this->synchronizations.iterator() );
 
@@ -212,6 +279,8 @@ void ActiveMQTransactionContext::afterRo
     // Notify each registered Synchronization that we rolled back this Transaction.
     synchronized( &this->synchronizations ) {
 
+        Finally finalizer( &this->synchronizations );
+
         std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
             this->synchronizations.iterator() );
 
@@ -223,15 +292,510 @@ void ActiveMQTransactionContext::afterRo
 
 ////////////////////////////////////////////////////////////////////////////////
 const Pointer<TransactionId>& ActiveMQTransactionContext::getTransactionId() const {
-    if( this->transactionId == NULL ) {
+    if( this->context->transactionId == NULL ) {
         throw decaf::lang::exceptions::InvalidStateException(
             __FILE__, __LINE__, "Transaction Not Started." );
     }
 
-    return transactionId;
+    return this->context->transactionId;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQTransactionContext::isInTransaction() const {
-    return this->transactionId != NULL;
+    return this->context->transactionId != NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::isInLocalTransaction() const {
+    return this->context->transactionId != NULL && this->context->transactionId->isLocalTransactionId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::isInXATransaction() const {
+    return this->context->transactionId != NULL && this->context->transactionId->isXATransactionId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQTransactionContext::getTransactionTimeout() const {
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::setTransactionTimeout( int seconds AMQCPP_UNUSED ) {
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQTransactionContext::recover( int flag AMQCPP_UNUSED, Xid** recovered ) {
+
+    Pointer<TransactionInfo> info( new TransactionInfo() );
+    info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+    info->setType( ActiveMQConstants::TRANSACTION_STATE_RECOVER );
+
+    try {
+
+        this->connection->checkClosedOrFailed();
+        this->connection->ensureConnectionInfoSent();
+
+        Pointer<Response> response = this->connection->syncRequest( info );
+        Pointer<DataArrayResponse> arrayResponse = response.dynamicCast<DataArrayResponse>();
+
+        std::vector< Pointer<DataStructure> > array = arrayResponse->getData();
+
+        int size = (int)array.size();
+
+        if( size > 0 ) {
+
+            // Allocate space for all the recovered Xid's, if client passed us an existing
+            // array then this would leak, but they were warned, so just go with it.
+            recovered = new Xid*[array.size()];
+
+            // We need to clone each Xid and then add it to the array, the client is now
+            // responsible for freeing this memory.
+            for( int i = 0; i < size; ++i ) {
+                Pointer<XATransactionId> xid = array[i].dynamicCast<XATransactionId>();
+                recovered[i] = xid->clone();
+            }
+        }
+
+        return size;
+    } catch( Exception& e ) {
+        throw toXAException( e );
+    } catch( CMSException& e ) {
+        throw toXAException( e );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::start( const Xid* xid, int flags ) {
+
+    if( this->isInLocalTransaction() ) {
+        throw XAException( XAException::XAER_PROTO );
+    }
+
+    // Are we already associated?
+    if( this->context->associatedXid != NULL ) {
+        throw new XAException( XAException::XAER_PROTO );
+    }
+
+    const char* txSuspendResumeNotSupportMsg =
+                    "The suspend/resume of a transaction " \
+                    "is not supported. Instead it is recommended " \
+                    "that a new JMS session be created.";
+
+    if( ( flags & TMJOIN ) == TMJOIN ) {
+        throw XAException( txSuspendResumeNotSupportMsg );
+    }
+    if( ( flags & TMRESUME ) == TMRESUME ) {
+        throw XAException( txSuspendResumeNotSupportMsg );
+    }
+
+    // prepare for a new association
+    this->synchronizations.clear();
+    this->context->beforeEndIndex = 0;
+
+    this->setXid( xid );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQTransactionContext::prepare( const Xid* xid ) {
+
+    // We allow interleaving multiple transactions, so we don't limit prepare to the associated xid.
+    Pointer<XATransactionId> x;
+
+    // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been called first
+    if( xid == NULL || equals( this->context->associatedXid.get(), xid ) ) {
+        throw XAException( XAException::XAER_PROTO );
+    } else {
+        x.reset( new XATransactionId( xid ) );
+    }
+
+    try {
+
+        Pointer<TransactionInfo> info( new TransactionInfo() );
+        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+        info->setTransactionId( x );
+        info->setType( ActiveMQConstants::TRANSACTION_STATE_PREPARE );
+
+        // Find out if the server wants to commit or rollback.
+        Pointer<Response> response = this->connection->syncRequest( info );
+
+        Pointer<IntegerResponse> intResponse = response.dynamicCast<IntegerResponse>();
+
+        if( XAResource::XA_RDONLY == intResponse->getResult() ) {
+
+            // transaction stops now, may be syncs that need a callback
+//            StlList<TransactionContext> l = this->context->ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+//            if (l != NULL && !l.isEmpty()) {
+//                for (TransactionContext ctx : l) {
+//                    ctx.afterCommit();
+//                }
+//            }
+
+            this->afterCommit();
+        }
+
+        return intResponse->getResult();
+
+    } catch( Exception& e ) {
+
+        try{
+            this->afterRollback();
+        } catch(...) {
+        }
+
+        throw toXAException( e );
+
+    } catch( CMSException& e ) {
+//        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+//        if (l != NULL && !l.isEmpty()) {
+//            for (TransactionContext ctx : l) {
+//                try {
+//                    ctx.afterRollback();
+//                } catch (Throwable ignored) {
+//                }
+//            }
+//        }
+
+        try{
+            this->afterRollback();
+        } catch(...) {
+        }
+
+        throw toXAException( e );
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::commit( const Xid* xid, bool onePhase ) {
+
+    // We allow interleaving multiple transactions, so we don't limit prepare to the associated xid.
+    Pointer<XATransactionId> x;
+
+    // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been called first
+    if( xid == NULL || equals( this->context->associatedXid.get(), xid ) ) {
+        throw XAException( XAException::XAER_PROTO );
+    } else {
+        x.reset( new XATransactionId( xid ) );
+    }
+
+    try {
+
+        this->connection->checkClosedOrFailed();
+        this->connection->ensureConnectionInfoSent();
+
+        // Let the server know that the tx is rollback.
+        Pointer<TransactionInfo> info( new TransactionInfo() );
+        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+        info->setTransactionId( x );
+        info->setType( onePhase ? ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE :
+                                  ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
+
+        this->connection->syncRequest( info );
+
+//        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+//        if (l != NULL && !l.isEmpty()) {
+//            for (TransactionContext ctx : l) {
+//                ctx.afterCommit();
+//            }
+//        }
+
+        this->afterCommit();
+
+    } catch( Exception& ex ) {
+
+        try {
+            this->afterRollback();
+        } catch(...) {
+        }
+
+        throw toXAException( ex );
+
+    } catch( CMSException& e ) {
+//        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+//        if (l != NULL && !l.isEmpty()) {
+//            for (TransactionContext ctx : l) {
+//                try {
+//                    ctx.afterRollback();
+//                } catch(...) {
+//                }
+//            }
+//        }
+
+        try {
+            this->afterRollback();
+        } catch(...) {
+        }
+
+        throw toXAException( e );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::rollback( const Xid* xid ) {
+
+    // We allow interleaving multiple transactions, so we don't limit prepare to the associated xid.
+    Pointer<XATransactionId> x;
+
+    if( xid == NULL ) {
+        throw XAException( XAException::XAER_PROTO );
+    }
+
+    if( equals( this->context->associatedXid.get(), xid ) ) {
+        x = this->context->transactionId.dynamicCast<XATransactionId>();
+    } else {
+        x.reset( new XATransactionId( xid ) );
+    }
+
+    try {
+
+        this->connection->checkClosedOrFailed();
+        this->connection->ensureConnectionInfoSent();
+
+        // Let the server know that the tx is rollback.
+        Pointer<TransactionInfo> info( new TransactionInfo() );
+        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+        info->setTransactionId( x );
+        info->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
+
+        this->connection->syncRequest( info );
+
+//        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+//        if (l != NULL && !l.isEmpty()) {
+//            for (TransactionContext ctx : l) {
+//                ctx.afterRollback();
+//            }
+//        }
+
+        this->afterRollback();
+
+    } catch( Exception& ex ) {
+        throw toXAException( ex );
+    } catch( CMSException& e ) {
+        throw toXAException( e );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::end( const Xid* xid, int flags ) {
+
+    if( isInLocalTransaction() ) {
+        throw XAException( XAException::XAER_PROTO );
+    }
+
+    if( ( flags & ( TMSUSPEND | TMFAIL ) ) != 0 ) {
+
+        // You can only suspend the associated xid.
+        if( !equals( this->context->associatedXid.get(), xid ) ) {
+            throw XAException( XAException::XAER_PROTO );
+        }
+
+        try {
+            this->beforeEnd();
+        } catch( Exception& e ) {
+            throw toXAException( e );
+        } catch( CMSException& e ) {
+            throw toXAException( e );
+        }
+
+        setXid( NULL );
+
+    } else if( ( flags & TMSUCCESS ) == TMSUCCESS ) {
+
+        // set to NULL if this is the current xid.
+        // otherwise this could be an asynchronous success call
+        if( equals( this->context->associatedXid.get(), xid ) ) {
+
+            try {
+                beforeEnd();
+            } catch( Exception& ex ) {
+                throw toXAException( ex );
+            } catch( CMSException& e ) {
+                throw toXAException( e );
+            }
+
+            setXid( NULL );
+        }
+    } else {
+        throw XAException( XAException::XAER_INVAL );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::forget( const Xid* xid ) {
+
+    // We allow interleaving multiple transactions, so we don't limit prepare to the associated xid.
+    Pointer<XATransactionId> x;
+
+    if( xid == NULL ) {
+        throw XAException( XAException::XAER_PROTO );
+    }
+
+    if( equals( this->context->associatedXid.get(), xid ) ) {
+        x = this->context->transactionId.dynamicCast<XATransactionId>();
+    } else {
+        x.reset( new XATransactionId( xid ) );
+    }
+
+    // Let the server know that the tx is rollback.
+    Pointer<TransactionInfo> info( new TransactionInfo() );
+    info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+    info->setTransactionId( x );
+    info->setType( ActiveMQConstants::TRANSACTION_STATE_FORGET );
+
+    try {
+        this->connection->syncRequest( info );
+    } catch( Exception& ex ) {
+        throw toXAException( ex );
+    } catch( CMSException& e ) {
+        throw toXAException( e );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::isSameRM( const XAResource* resource ) {
+
+    if( resource == NULL ) {
+        return false;
+    }
+
+    const ActiveMQTransactionContext* cntx =
+        dynamic_cast<const ActiveMQTransactionContext*>( resource );
+
+    if( cntx == NULL ) {
+        return false;
+    }
+
+    try{
+        return getResourceManagerId() == cntx->getResourceManagerId();
+    } catch( Exception& ex ) {
+        throw toXAException( ex );
+    } catch( CMSException& ex ) {
+        throw XAException( "Could not get the Resource Manager Id.", &ex );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::setXid( const Xid* xid ) {
+
+    try {
+        this->connection->checkClosedOrFailed();
+        this->connection->ensureConnectionInfoSent();
+    } catch( Exception& e ) {
+        throw toXAException( e );
+    } catch( CMSException& e ) {
+        throw toXAException( e );
+    }
+
+    if( xid != NULL ) {
+
+        // Associate this new Xid with this Transaction as the root of the TX.
+        this->context->associatedXid.reset( xid->clone() );
+        this->context->transactionId.reset( new XATransactionId( xid ) );
+
+        Pointer<TransactionInfo> info( new TransactionInfo() );
+        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+        info->setTransactionId( this->context->transactionId );
+        info->setType( ActiveMQConstants::TRANSACTION_STATE_BEGIN );
+
+        try {
+            this->connection->oneway( info );
+        } catch( Exception& e ) {
+            throw toXAException( e );
+        } catch( CMSException& e ) {
+            throw toXAException( e );
+        }
+
+    } else {
+
+        if( this->context->transactionId != NULL ) {
+
+            Pointer<TransactionInfo> info( new TransactionInfo() );
+            info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+            info->setTransactionId( this->context->transactionId );
+            info->setType( ActiveMQConstants::TRANSACTION_STATE_END );
+
+            try {
+                this->connection->syncRequest( info );
+            } catch( CMSException& e ) {
+                throw toXAException( e );
+            }
+
+//            // Add our self to the list of contexts that are interested in
+//            // post commit/rollback events.
+//            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
+//            if (l == NULL) {
+//                l = new ArrayList<TransactionContext>(3);
+//                ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
+//                l.add(this);
+//            } else if (!l.contains(this)) {
+//                l.add(this);
+//            }
+        }
+
+        // remove the association currently in place.
+        this->context->associatedXid.reset( NULL );
+        this->context->transactionId.reset( NULL );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQTransactionContext::equals( const cms::Xid* local, const cms::Xid* remote ) {
+
+    if( local == remote ) {
+        return true;
+    }
+
+    if( ( local == NULL ) ^ ( remote == NULL ) ) {
+        return false;
+    }
+
+    if( local->getFormatId() != remote->getFormatId() ) {
+        return false;
+    } else {
+
+        std::vector<unsigned char> localBQual( Xid::MAXBQUALSIZE );
+        std::vector<unsigned char> remoteBQual( Xid::MAXBQUALSIZE );
+
+        local->getBranchQualifier( &localBQual[0], Xid::MAXBQUALSIZE );
+        remote->getBranchQualifier( &remoteBQual[0], Xid::MAXBQUALSIZE );
+
+        if( localBQual != remoteBQual ) {
+            return false;
+        }
+
+        std::vector<unsigned char> localGTXID( Xid::MAXBQUALSIZE );
+        std::vector<unsigned char> remoteGTXID( Xid::MAXBQUALSIZE );
+
+        local->getGlobalTransactionId( &localGTXID[0], Xid::MAXGTRIDSIZE );
+        remote->getGlobalTransactionId( &remoteGTXID[0], Xid::MAXGTRIDSIZE );
+
+        if( localGTXID != remoteGTXID ) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQTransactionContext::getResourceManagerId() const {
+    return this->connection->getResourceManagerId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+XAException ActiveMQTransactionContext::toXAException( decaf::lang::Exception& ex ) {
+    CMSException cmsEx = CMSExceptionSupport::create( ex );
+    XAException xae( ex.getMessage(), &cmsEx );
+    xae.setErrorCode( XAException::XAER_RMFAIL );
+    return xae;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+XAException ActiveMQTransactionContext::toXAException( cms::CMSException& ex ) {
+    XAException xae( ex.getMessage(), &ex );
+    xae.setErrorCode( XAException::XAER_RMFAIL );
+    return xae;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h?rev=1036054&r1=1036053&r2=1036054&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h Wed Nov 17 14:21:19 2010
@@ -20,6 +20,9 @@
 #include <memory>
 
 #include <cms/Message.h>
+#include <cms/XAResource.h>
+#include <cms/CMSException.h>
+#include <cms/XAException.h>
 
 #include <activemq/util/Config.h>
 #include <activemq/exceptions/ActiveMQException.h>
@@ -37,8 +40,10 @@ namespace core{
 
     using decaf::lang::Pointer;
 
+    class LocalTransactionEventListener;
     class ActiveMQSession;
     class ActiveMQConnection;
+    class TxContextData;
 
     /**
      * Transaction Management class, hold messages that are to be redelivered
@@ -49,18 +54,18 @@ namespace core{
      *
      * @since 2.0
      */
-    class AMQCPP_API ActiveMQTransactionContext {
+    class AMQCPP_API ActiveMQTransactionContext : public cms::XAResource {
     private:
 
+        // Internal structure to hold all class TX data.
+        TxContextData* context;
+
         // Session this Transaction is associated with
         ActiveMQSession* session;
 
         // The Connection that is the parent of the Session.
         ActiveMQConnection* connection;
 
-        // Transaction Info for the current Transaction
-        Pointer<commands::TransactionId> transactionId;
-
         // List of Registered Synchronizations
         decaf::util::StlSet< Pointer<Synchronization> > synchronizations;
 
@@ -130,8 +135,52 @@ namespace core{
          */
         virtual bool isInTransaction() const;
 
+        /**
+         * Checks to see if there is currently an Local Transaction in progess, returns
+         * false if not, true otherwise.
+         *
+         * @returns true if an Local Transaction is in progress.
+         */
+        virtual bool isInLocalTransaction() const;
+
+        /**
+         * Checks to see if there is currently an XA Transaction in progess, returns
+         * false if not, true otherwise.
+         *
+         * @returns true if an XA Transaction is in progress.
+         */
+        virtual bool isInXATransaction() const;
+
+    public:  // XAResource implementation.
+
+        virtual void commit( const cms::Xid* xid, bool onePhase );
+
+        virtual void end( const cms::Xid* xid, int flags );
+
+        virtual void forget( const cms::Xid* xid );
+
+        virtual int getTransactionTimeout() const;
+
+        virtual bool isSameRM( const cms::XAResource* theXAResource );
+
+        virtual int prepare( const cms::Xid* xid );
+
+        virtual int recover(int flag, cms::Xid** recovered );
+
+        virtual void rollback( const cms::Xid* xid );
+
+        virtual bool setTransactionTimeout( int seconds );
+
+        virtual void start( const cms::Xid* xid, int flags );
+
     private:
 
+        std::string getResourceManagerId() const;
+        void setXid( const cms::Xid* xid );
+        bool equals( const cms::Xid* local, const cms::Xid* remote );
+        cms::XAException toXAException( cms::CMSException& ex );
+        cms::XAException toXAException( decaf::lang::Exception& ex );
+
         void beforeEnd();
         void afterCommit();
         void afterRollback();

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp Wed Nov 17 14:21:19 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQXAConnection.h"
+
+#include <activemq/core/ActiveMQXASession.h>
+#include <activemq/util/CMSExceptionSupport.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnection::ActiveMQXAConnection( const Pointer<transport::Transport>& transport,
+                                            const Pointer<decaf::util::Properties>& properties )
+  : ActiveMQConnection(transport, properties ) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnection::~ActiveMQXAConnection() throw() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XASession* ActiveMQXAConnection::createXASession() {
+    return dynamic_cast<cms::XASession*>( this->createSession( cms::Session::SESSION_TRANSACTED ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session* ActiveMQXAConnection::createSession( cms::Session::AcknowledgeMode ackMode AMQCPP_UNUSED ) {
+
+    try {
+
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+
+        // Create the session instance.
+        cms::Session* session = new ActiveMQXASession(
+            this, getNextSessionId(), this->getProperties() );
+
+        return session;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h Wed Nov 17 14:21:19 2010
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQXACONNECTION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQXACONNECTION_H_
+
+#include <activemq/util/Config.h>
+
+#include <cms/XAConnection.h>
+#include <activemq/core/ActiveMQConnection.h>
+
+namespace activemq {
+namespace core {
+
+    using decaf::lang::Pointer;
+
+    class AMQCPP_API ActiveMQXAConnection : public cms::XAConnection,
+                                            public ActiveMQConnection {
+    private:
+
+        ActiveMQXAConnection( const ActiveMQXAConnection& );
+        ActiveMQXAConnection& operator= ( const ActiveMQXAConnection& );
+
+    public:
+
+        ActiveMQXAConnection( const Pointer<transport::Transport>& transport,
+                              const Pointer<decaf::util::Properties>& properties );
+
+        virtual ~ActiveMQXAConnection() throw();
+
+        virtual cms::XASession* createXASession();
+
+        virtual cms::Session* createSession( cms::Session::AcknowledgeMode ackMode );
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQXACONNECTION_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp Wed Nov 17 14:21:19 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQXAConnectionFactory.h"
+
+#include <activemq/core/ActiveMQXAConnection.h>
+
+using namespace activemq;
+using namespace activemq::core;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnectionFactory::ActiveMQXAConnectionFactory() :
+    ActiveMQConnectionFactory() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnectionFactory::ActiveMQXAConnectionFactory( const std::string& uri,
+                                                          const std::string& username,
+                                                          const std::string& password ) :
+    ActiveMQConnectionFactory( uri, username, password ) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnectionFactory::ActiveMQXAConnectionFactory( const decaf::net::URI& uri,
+                                                          const std::string& username,
+                                                          const std::string& password ) :
+    ActiveMQConnectionFactory( uri, username, password ) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXAConnectionFactory::~ActiveMQXAConnectionFactory() throw() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XAConnection* ActiveMQXAConnectionFactory::createXAConnection() {
+    return dynamic_cast<cms::XAConnection*>( createConnection() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XAConnection* ActiveMQXAConnectionFactory::createXAConnection( const std::string& userName,
+                                                                    const std::string& password ) {
+    return dynamic_cast<cms::XAConnection*>( createConnection( userName, password ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnection* ActiveMQXAConnectionFactory::createActiveMQConnection(
+    const Pointer<transport::Transport>& transport,
+    const Pointer<decaf::util::Properties>& properties ) {
+
+    return new ActiveMQXAConnection( transport, properties );
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h Wed Nov 17 14:21:19 2010
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQXACONNECTIONFACTORY_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQXACONNECTIONFACTORY_H_
+
+#include <activemq/util/Config.h>
+
+#include <cms/XAConnectionFactory.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <decaf/net/URI.h>
+#include <decaf/util/Properties.h>
+
+namespace activemq {
+namespace core {
+
+    using decaf::lang::Pointer;
+
+    class AMQCPP_API ActiveMQXAConnectionFactory : public cms::XAConnectionFactory,
+                                                   public ActiveMQConnectionFactory {
+    public:
+
+        ActiveMQXAConnectionFactory();
+
+        /**
+         * Constructor
+         * @param uri the URI of the Broker we are connecting to.
+         * @param username to authenticate with, defaults to ""
+         * @param password to authenticate with, defaults to ""
+         */
+        ActiveMQXAConnectionFactory( const std::string& uri,
+                                     const std::string& username = "",
+                                     const std::string& password = "" );
+
+        /**
+         * Constructor
+         * @param uri the URI of the Broker we are connecting to.
+         * @param username to authenticate with, defaults to ""
+         * @param password to authenticate with, defaults to ""
+         */
+        ActiveMQXAConnectionFactory( const decaf::net::URI& uri,
+                                     const std::string& username = "",
+                                     const std::string& password = "" );
+
+        virtual ~ActiveMQXAConnectionFactory() throw();
+
+        virtual cms::XAConnection* createXAConnection();
+
+        virtual cms::XAConnection* createXAConnection( const std::string& userName,
+                                                       const std::string& password );
+
+    protected:
+
+        virtual ActiveMQConnection* createActiveMQConnection( const Pointer<transport::Transport>& transport,
+                                                              const Pointer<decaf::util::Properties>& properties );
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQXACONNECTIONFACTORY_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnectionFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp Wed Nov 17 14:21:19 2010
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQXASession.h"
+
+#include <cms/TransactionInProgressException.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXASession::ActiveMQXASession( ActiveMQConnection* connection,
+                                      const Pointer<commands::SessionId>& sessionId,
+                                      const decaf::util::Properties& properties ) :
+    ActiveMQSession( connection, sessionId, cms::Session::AUTO_ACKNOWLEDGE, properties ) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXASession::~ActiveMQXASession() throw() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQXASession::isTransacted() const {
+    return this->transaction->isInXATransaction();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQXASession::isAutoAcknowledge() const {
+    // Force this to always be true so the Session acts like an Auto Ack session
+    // when there is no active XA Transaction.
+    return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASession::doStartTransaction() {
+    // Controlled by the XAResource so this method is now a No-op.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASession::commit() {
+    throw cms::TransactionInProgressException("Cannot commit inside an XASession");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASession::rollback() {
+    throw cms::TransactionInProgressException("Cannot rollback inside an XASession");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XAResource* ActiveMQXASession::getXAResource() const {
+    return this->transaction.get();
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h?rev=1036054&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h Wed Nov 17 14:21:19 2010
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQXASESSION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQXASESSION_H_
+
+#include <activemq/util/Config.h>
+
+#include <cms/XASession.h>
+#include <activemq/core/ActiveMQSession.h>
+
+namespace activemq {
+namespace core {
+
+    using decaf::lang::Pointer;
+
+    class AMQCPP_API ActiveMQXASession : public cms::XASession,
+                                         public ActiveMQSession {
+    public:
+
+        ActiveMQXASession( ActiveMQConnection* connection,
+                           const Pointer<commands::SessionId>& sessionId,
+                           const decaf::util::Properties& properties );
+
+        virtual ~ActiveMQXASession() throw();
+
+    public:  // Override ActiveMQSession methods to make them XA Aware
+
+        virtual bool isTransacted() const;
+
+        virtual bool isAutoAcknowledge() const;
+
+        virtual void doStartTransaction();
+
+        virtual void commit();
+
+        virtual void rollback();
+
+    public:  // XASession overrides
+
+        virtual cms::XAResource* getXAResource() const;
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQXASESSION_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h
------------------------------------------------------------------------------
    svn:eol-style = native