You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/04/01 23:49:42 UTC

svn commit: r761076 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ test-integration/

Author: tabish
Date: Wed Apr  1 21:49:41 2009
New Revision: 761076

URL: http://svn.apache.org/viewvc?rev=761076&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-209

Initial refactoring of Transaction processing and message acking in general, fixes several issues.  Transactions work, however there are some issues still when shutting down an there is an uncommitted transaction open.  

Some code put in place to allow of support for optimized acknowledge and individual acknowledge later on.


Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Synchronization.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h Wed Apr  1 21:49:41 2009
@@ -96,26 +96,6 @@
         std::string clientId;
 
         /**
-         * Next available Producer Id
-         */
-        util::LongSequenceGenerator producerIds;
-
-        /**
-         * Next available Producer Sequence Id
-         */
-        util::LongSequenceGenerator producerSequenceIds;
-
-        /**
-         * Next available Consumer Id
-         */
-        util::LongSequenceGenerator consumerIds;
-
-        /**
-         * Next available Transaction Id
-         */
-        util::LongSequenceGenerator transactionIds;
-
-        /**
          * Next available Session Id.
          */
         util::LongSequenceGenerator sessionIds;
@@ -306,38 +286,6 @@
         }
 
         /**
-         * Get the Next available Producer Id
-         * @return the next id in the sequence.
-         */
-        long long getNextProducerId() {
-            return this->producerIds.getNextSequenceId();
-        }
-
-        /**
-         * Get the Next available Producer Sequence Id
-         * @return the next id in the sequence.
-         */
-        long long getNextProducerSequenceId() {
-            return this->producerSequenceIds.getNextSequenceId();
-        }
-
-        /**
-         * Get the Next available Consumer Id
-         * @return the next id in the sequence.
-         */
-        long long getNextConsumerId() {
-            return this->consumerIds.getNextSequenceId();
-        }
-
-        /**
-         * Get the Next available Transaction Id
-         * @return the next id in the sequence.
-         */
-        long long getNextTransactionId() {
-            return this->transactionIds.getNextSequenceId();
-        }
-
-        /**
          * Get the Next available Session Id.
          * @return the next id in the sequence.
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h Wed Apr  1 21:49:41 2009
@@ -53,10 +53,12 @@
         // Represents the Acknowledgement types that are supported for the
         // Message Ack Command.
         enum AckType {
-            ACK_TYPE_DELIVERED = 0,  // Message delivered but not consumed
-            ACK_TYPE_POISON    = 1,  // Message could not be processed due to
-                                     // poison pill but discard anyway
-            ACK_TYPE_CONSUMED  = 2   // Message consumed, discard
+            ACK_TYPE_DELIVERED   = 0,  // Message delivered but not consumed
+            ACK_TYPE_POISON      = 1,  // Message could not be processed due to
+                                       // poison pill but discard anyway
+            ACK_TYPE_CONSUMED    = 2,  // Message consumed, discard
+            ACK_TYPE_REDELIVERED = 3,  // Message has been re-delivered.
+            ACK_TYPE_INDIVIDUAL  = 4   // Acks a single message at a time.
         };
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Wed Apr  1 21:49:41 2009
@@ -20,6 +20,7 @@
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 #include <decaf/util/Date.h>
+#include <decaf/lang/Math.h>
 #include <activemq/util/Config.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/Message.h>
@@ -32,6 +33,7 @@
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQTransactionContext.h>
 #include <cms/ExceptionListener.h>
+#include <memory>
 
 using namespace std;
 using namespace activemq;
@@ -44,6 +46,46 @@
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core {
+
+    class ConsumerSynhcronization : public Synchronization {
+    private:
+
+        ActiveMQConsumer* consumer;
+
+    public:
+
+        ConsumerSynhcronization( ActiveMQConsumer* consumer ) {
+
+            if( consumer == NULL ) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
+            }
+
+            this->consumer = consumer;
+        }
+
+        virtual void beforeEnd() throw( exceptions::ActiveMQException ) {
+            consumer->acknowledge();
+            consumer->setSynchronizationRegistered( false );
+        }
+
+        virtual void afterCommit() throw( exceptions::ActiveMQException ) {
+            consumer->commit();
+            consumer->setSynchronizationRegistered( false );
+        }
+
+        virtual void afterRollback() throw( exceptions::ActiveMQException ) {
+            consumer->rollback();
+            consumer->setSynchronizationRegistered( false );
+        }
+
+    };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
 ActiveMQConsumer::ActiveMQConsumer( const Pointer<ConsumerInfo>& consumerInfo,
                                     ActiveMQSession* session,
                                     const Pointer<ActiveMQTransactionContext>& transaction ) {
@@ -60,6 +102,7 @@
     this->consumerInfo = consumerInfo;
     this->closed = false;
     this->lastDeliveredSequenceId = 0;
+    this->synchronizationRegistered = false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -359,6 +402,10 @@
 
     this->lastDeliveredSequenceId = message->getMessageId()->getBrokerSequenceId();
 
+    synchronized( &dispatchedMessages ) {
+        dispatchedMessages.enqueueFront( message );
+    }
+
     // If the session is transacted then we hand off the message to it to
     // be stored for later redelivery.  We do need to check and see if we
     // are approaching the prefetch limit and send an Delivered ack just so
@@ -371,13 +418,7 @@
                 "In a Transacted Session but no Transaction Context set." );
         }
 
-        // Store the message in the transaction, we clone the message into the
-        // transaction so that there is a copy to commit if commit is called in
-        // the async onMessage method and also so we know that our copy can
-        // be deleted.
-// TODO
-//        transaction->addToTransaction(
-//            dynamic_cast<cms::Message*>( message )->clone(), this );
+        ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
     }
 }
 
@@ -387,8 +428,31 @@
 
     try{
 
-        if( session->isAutoAcknowledge() || messageExpired ) {
-            this->acknowledge( message.get(), ActiveMQConstants::ACK_TYPE_CONSUMED );
+        if( messageExpired == true ) {
+            ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        }
+
+        if( session->isAutoAcknowledge() ) {
+
+            if( this->deliveringAcks.compareAndSet( false, true ) ) {
+
+                synchronized( &dispatchedMessages ) {
+                    if( !dispatchedMessages.empty() ) {
+                        Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
+                            ActiveMQConstants::ACK_TYPE_CONSUMED );
+
+                        if( ack != NULL ) {
+                            dispatchedMessages.clear();
+                            session->oneway( ack );
+                        }
+                    }
+                }
+
+                this->deliveringAcks.set( false );
+            }
+
+        } else if( session->isClientAcknowledge() ) {
+            ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -397,15 +461,90 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::acknowledgeMessage( const commands::Message* message )
+void ActiveMQConsumer::ackLater( const Pointer<Message>& message, int ackType )
+    throw ( ActiveMQException ) {
+
+    // Don't acknowledge now, but we may need to let the broker know the
+    // consumer got the message to expand the pre-fetch window
+    if( session->isTransacted() ) {
+        session->doStartTransaction();
+        if( !synchronizationRegistered ) {
+            synchronizationRegistered = true;
+
+            Pointer<Synchronization> sync( new ConsumerSynhcronization( this ) );
+            this->transaction->addSynchronization( sync );
+        }
+    }
+
+    // The delivered message list is only needed for the recover method
+    // which is only used with client ack.
+    deliveredCounter++;
+
+    Pointer<MessageAck> oldPendingAck = pendingAck;
+    pendingAck.reset( new MessageAck() );
+    pendingAck->setConsumerId( this->consumerInfo->getConsumerId() );
+    pendingAck->setAckType( ackType );
+    pendingAck->setDestination( message->getDestination() );
+    pendingAck->setLastMessageId( message->getMessageId() );
+    pendingAck->setMessageCount( deliveredCounter );
+
+    if( oldPendingAck == NULL ) {
+        pendingAck->setFirstMessageId( pendingAck->getLastMessageId() );
+    } else {
+        pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() );
+    }
+
+    if( session->isTransacted() ) {
+        pendingAck->setTransactionId( this->transaction->getTransactionId() );
+    }
+
+    if( ( 0.5 * this->consumerInfo->getPrefetchSize()) <= ( deliveredCounter - additionalWindowSize ) ) {
+        session->oneway( pendingAck );
+        pendingAck.reset( NULL );
+        additionalWindowSize = deliveredCounter;
+
+        // When using DUPS ok, we do a real ack.
+        if( ackType == ActiveMQConstants::ACK_TYPE_CONSUMED ) {
+            deliveredCounter = 0;
+            additionalWindowSize = 0;
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageAck> ActiveMQConsumer::makeAckForAllDeliveredMessages( int type ) {
+
+    synchronized( &dispatchedMessages ) {
+
+        if( !dispatchedMessages.empty() ) {
+
+            Pointer<Message> message = dispatchedMessages.front();
+            Pointer<MessageAck> ack( new MessageAck() );
+            ack->setAckType( type );
+            ack->setConsumerId( this->consumerInfo->getConsumerId() );
+            ack->setDestination( message->getDestination() );
+            ack->setMessageCount( dispatchedMessages.size() );
+            ack->setLastMessageId( message->getMessageId() );
+            ack->setFirstMessageId( dispatchedMessages.back()->getMessageId() );
+
+            return ack;
+        }
+    }
+
+    return Pointer<MessageAck>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED )
    throw ( cms::CMSException ) {
 
     try{
 
         this->checkClosed();
 
-        // Send an ack indicating that the client has consumed the message
-        this->acknowledge( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
+        if( this->session->isClientAcknowledge() ) {
+            this->acknowledge();
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -413,41 +552,37 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::acknowledge( const commands::Message* message, int ackType )
-    throw ( cms::CMSException ) {
+void ActiveMQConsumer::acknowledge() throw ( cms::CMSException ) {
 
     try{
 
-        this->checkClosed();
+        synchronized( &dispatchedMessages ) {
 
-        if( message == NULL ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQConsumer::acknowledge - Message passed to Ack was NULL.");
-        }
+            // Acknowledge all messages so far.
+            Pointer<MessageAck> ack =
+                makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
 
-        Pointer<MessageAck> ack( new MessageAck() );
-        ack->setAckType( (int)ackType );
-        ack->setConsumerId( this->consumerInfo->getConsumerId() );
-        ack->setDestination( message->getDestination() );
-        ack->setFirstMessageId( message->getMessageId() );
-        ack->setLastMessageId( message->getMessageId() );
-        ack->setMessageCount( 1 );
+            if( ack == NULL ) {
+                return;
+            }
 
-        if( this->session->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
+            if( session->isTransacted() ) {
+                session->doStartTransaction();
+                ack->setTransactionId( transaction->getTransactionId() );
+            }
 
-            if( this->transaction == NULL ) {
+            session->oneway( ack );
+            pendingAck.reset( NULL );
 
-                throw ActiveMQException(
-                        __FILE__, __LINE__,
-                        "ActiveMQConsumer::acknowledge - "
-                        "Transacted Session, has no Transaction Info.");
-            }
+            // Adjust the counters
+            deliveredCounter -= dispatchedMessages.size();
+            additionalWindowSize =
+                Math::max( 0, additionalWindowSize - dispatchedMessages.size() );
 
-            ack->setTransactionId( this->transaction->getTransactionId() );
+            if( !session->isTransacted() ) {
+                dispatchedMessages.clear();
+            }
         }
-
-        this->session->oneway( ack );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -455,6 +590,112 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::commit() throw( ActiveMQException ) {
+
+    synchronized( &dispatchedMessages ) {
+        dispatchedMessages.clear();
+    }
+    redeliveryDelay = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::rollback() throw( ActiveMQException ) {
+
+    synchronized( &unconsumedMessages ) {
+
+        synchronized( &dispatchedMessages ) {
+            if( dispatchedMessages.empty() ) {
+                return;
+            }
+
+            // Only increase the redelivery delay after the first redelivery..
+            Pointer<Message> lastMsg = dispatchedMessages.front();
+            const int currentRedeliveryCount = lastMsg->getRedeliveryCounter();
+            if( currentRedeliveryCount > 0 ) {
+                redeliveryDelay = transaction->getRedeliveryDelay();
+            }
+
+            Pointer<MessageId> firstMsgId = dispatchedMessages.back()->getMessageId();
+
+            std::auto_ptr< Iterator< Pointer<Message> > > iter( dispatchedMessages.iterator() );
+
+            while( iter->hasNext() ) {
+                Pointer<Message> message = iter->next();
+                message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 );
+            }
+
+            if( lastMsg->getRedeliveryCounter() > this->transaction->getMaximumRedeliveries() ) {
+
+                // We need to NACK the messages so that they get sent to the DLQ.
+                // Acknowledge the last message.
+                Pointer<MessageAck> ack( new MessageAck() );
+                ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
+                ack->setConsumerId( this->consumerInfo->getConsumerId() );
+                ack->setDestination( lastMsg->getDestination() );
+                ack->setMessageCount( dispatchedMessages.size() );
+                ack->setLastMessageId( lastMsg->getMessageId() );
+                ack->setFirstMessageId( firstMsgId );
+
+                session->oneway( ack );
+                // Adjust the window size.
+                additionalWindowSize =
+                    Math::max( 0, additionalWindowSize - dispatchedMessages.size() );
+                redeliveryDelay = 0;
+
+            } else {
+
+                // only redelivery_ack after first delivery
+                if( currentRedeliveryCount > 0 ) {
+                    Pointer<MessageAck> ack( new MessageAck() );
+                    ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
+                    ack->setConsumerId( this->consumerInfo->getConsumerId() );
+                    ack->setDestination( lastMsg->getDestination() );
+                    ack->setMessageCount( dispatchedMessages.size() );
+                    ack->setLastMessageId( lastMsg->getMessageId() );
+                    ack->setFirstMessageId( firstMsgId );
+
+                    session->oneway( ack );
+                }
+
+//                // stop the delivery of messages.
+//                unconsumedMessages.stop();
+
+                std::auto_ptr< Iterator< Pointer<Message> > > iter( dispatchedMessages.iterator() );
+
+                while( iter->hasNext() ) {
+                    DispatchData dispatch( this->consumerInfo->getConsumerId(), iter->next() );
+                    unconsumedMessages.enqueueFront( dispatch );
+                }
+
+//                if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
+//                    // Start up the delivery again a little later.
+//                    scheduler.executeAfterDelay(new Runnable() {
+//                        public void run() {
+//                            try {
+//                                if (started.get()) {
+//                                    start();
+//                                }
+//                            } catch (JMSException e) {
+//                                session.connection.onAsyncException(e);
+//                            }
+//                        }
+//                    }, redeliveryDelay);
+//                } else {
+//                    start();
+//                }
+
+            }
+            deliveredCounter -= dispatchedMessages.size();
+            dispatchedMessages.clear();
+        }
+    }
+
+    if( this->listener.get() != NULL ) {
+        session->redispatch( unconsumedMessages );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::dispatch( DispatchData& data ) {
 
     try {
@@ -463,7 +704,7 @@
 
         // Don't dispatch expired messages, ack it and then destroy it
         if( message->isExpired() ) {
-            this->acknowledge( message.get(), ActiveMQConstants::ACK_TYPE_CONSUMED );
+            this->ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
 
             // stop now, don't queue
             return;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Wed Apr  1 21:49:41 2009
@@ -25,11 +25,13 @@
 #include <activemq/util/Config.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/MessageAck.h>
 #include <activemq/core/ActiveMQAckHandler.h>
 #include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/Dispatcher.h>
 
 #include <decaf/util/concurrent/atomic/AtomicReference.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/StlQueue.h>
 #include <decaf/util/concurrent/Mutex.h>
@@ -40,6 +42,7 @@
 
     using decaf::lang::Pointer;
     using decaf::util::concurrent::atomic::AtomicReference;
+    using decaf::util::concurrent::atomic::AtomicBoolean;
 
     class ActiveMQSession;
 
@@ -71,6 +74,11 @@
         AtomicReference<cms::MessageListener> listener;
 
         /**
+         * Is the consumer currently delivering acks.
+         */
+        AtomicBoolean deliveringAcks;
+
+        /**
          * Queue of unconsumed messages.
          */
         decaf::util::StlQueue<DispatchData> unconsumedMessages;
@@ -86,6 +94,31 @@
         long long lastDeliveredSequenceId;
 
         /**
+         * Next Ack to go out.
+         */
+        Pointer<commands::MessageAck> pendingAck;
+
+        /**
+         * How many message's have been delivered so far since the last Ack was sent.
+         */
+        int deliveredCounter;
+
+        /**
+         * How big to grow the ack window next time.
+         */
+        int additionalWindowSize;
+
+        /**
+         * Time to wait before restarting delivery of rollback messages.
+         */
+        long redeliveryDelay;
+
+        /**
+         * Has the Synchronization been added for this transaction
+         */
+        volatile bool synchronizationRegistered;
+
+        /**
          * Boolean that indicates if the consumer has been closed
          */
         bool closed;
@@ -178,15 +211,22 @@
     public:  // ActiveMQConsumer Methods
 
         /**
-         * Method called to acknowledge the message passed, ack it using
-         * the passed in ackType, see <code>Connector</code> for a list
-         * of the correct ack types.
-         * @param message the Message to Acknowledge
-         * @param ackType the Type of ack to send, (connector enum)
+         * Method called to acknowledge all messages that have been received so far.
          * @throw CMSException
          */
-        virtual void acknowledge( const commands::Message* message, int ackType )
-            throw ( cms::CMSException );
+        void acknowledge() throw ( cms::CMSException );
+
+        /**
+         * Called to Commit the current set of messages in this Transaction
+         * @throw ActiveMQException
+         */
+        void commit() throw ( exceptions::ActiveMQException );
+
+        /**
+         * Called to Roll back the current set of messages in this Transaction
+         * @throw ActiveMQException
+         */
+        void rollback() throw ( exceptions::ActiveMQException );
 
         /**
          * Get the Consumer information for this consumer
@@ -213,6 +253,22 @@
             return this->closed;
         }
 
+        /**
+         * Has this Consumer Transaction Synchronization been added to the transaction
+         * @return true if the synchronization has been added.
+         */
+        bool issynchronizationRegistered() const {
+            return this->synchronizationRegistered;
+        }
+
+        /**
+         * Sets the Synchronization Registered state of this consumer.
+         * @param value - true if registered false otherwise.
+         */
+        void setSynchronizationRegistered( bool value ) {
+            this->synchronizationRegistered = value;
+        }
+
     protected:
 
         /**
@@ -257,7 +313,7 @@
         /**
          * If supported sends a message pull request to the service provider asking
          * for the delivery of a new message.  This is used in the case where the
-         * service provider has been configured with a zero prefectch or is only
+         * service provider has been configured with a zero prefetch or is only
          * capable of delivering messages on a pull basis.  No request is made if
          * there are already messages in the unconsumed queue since there's no need
          * for a server round-trip in that instance.
@@ -269,6 +325,15 @@
         // Checks for the closed state and throws if so.
         void checkClosed() const throw( exceptions::ActiveMQException );
 
+        // Sends an ack as needed in order to keep them coming in if the current
+        // ack mode allows the consumer to receive up to the prefetch limit before
+        // an real ack is sent.
+        void ackLater( const Pointer<commands::Message>& message, int ackType )
+            throw ( exceptions::ActiveMQException );
+
+        // Create an Ack Message that acks all messages that have been delivered so far.
+        Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Wed Apr  1 21:49:41 2009
@@ -33,7 +33,7 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQProducer::ActiveMQProducer( const Pointer<commands::ProducerInfo>& producerInfo,
-                                    const cms::Destination* destination,
+                                    const Pointer<cms::Destination>& destination,
                                     ActiveMQSession* session ) {
 
     if( session == NULL || producerInfo == NULL ) {
@@ -45,7 +45,7 @@
     // Init Producer Data
     this->session = session;
     this->producerInfo = producerInfo;
-    this->destination.reset( destination != NULL ? destination->clone() : NULL );
+    this->destination = destination;
     this->closed = false;
 
     // Default the Delivery options
@@ -54,8 +54,6 @@
     this->disableMessageId = false;
     this->defaultPriority = 4;
     this->defaultTimeToLive = 0;
-
-    this->session->syncRequest( this->producerInfo );
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Wed Apr  1 21:49:41 2009
@@ -71,7 +71,7 @@
         std::auto_ptr<util::MemoryUsage> memoryUsage;
 
         // The Destination assigned at creation, NULL if not assigned.
-        std::auto_ptr<cms::Destination> destination;
+        Pointer<cms::Destination> destination;
 
     public:
 
@@ -87,7 +87,7 @@
          *        The Session which is the parent of this Producer.
          */
         ActiveMQProducer( const Pointer<commands::ProducerInfo>& producerInfo,
-                          const cms::Destination* destination,
+                          const Pointer<cms::Destination>& destination,
                           ActiveMQSession* session );
 
         virtual ~ActiveMQProducer();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Apr  1 21:49:41 2009
@@ -116,6 +116,12 @@
         // Stop the dispatch executor.
         stop();
 
+        // TODO = Commit it first.  ??
+        // Destroy the Transaction
+        if( this->transaction.get() != NULL && this->transaction->isInTransaction() ){
+            this->transaction->commit();
+        }
+
         // Close all Consumers
         synchronized( &this->consumers ) {
 
@@ -144,13 +150,6 @@
             }
         }
 
-        // TODO = Commit it first.  ??
-        // Destroy the Transaction
-        if( this->transaction.get() != NULL ){
-            this->transaction->commit();
-            this->transaction.release();
-        }
-
         // Remove this session from the Broker.
         this->connection->disposeOf( this->sessionInfo->getSessionId() );
 
@@ -199,8 +198,18 @@
                 "ActiveMQSession::rollback - This Session is not Transacted" );
         }
 
-        // Rollback the Transaction
+        bool started = this->executor->isStarted();
+
+        if( started ) {
+            this->executor->stop();
+        }
+
+        // Roll back the Transaction
         this->transaction->rollback();
+
+        if( started ) {
+            this->executor->start();
+        }
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -337,10 +346,13 @@
 
         this->checkClosed();
 
+        Pointer<cms::Destination> clonedDestination(
+            destination != NULL ? destination->clone() : NULL );
+
         decaf::lang::Pointer<commands::ProducerId> producerId( new commands::ProducerId() );
         producerId->setConnectionId( this->sessionInfo->getSessionId()->getConnectionId() );
         producerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
-        producerId->setValue( this->connection->getNextProducerId() );
+        producerId->setValue( this->getNextProducerId() );
 
         Pointer<commands::ProducerInfo> producerInfo( new commands::ProducerInfo() );
         producerInfo->setProducerId( producerId );
@@ -348,22 +360,16 @@
 
         // Producers are allowed to have NULL destinations.  In this case, the
         // destination is specified by the messages as they are sent.
-        if( destination != NULL ) {
+        if( clonedDestination != NULL ) {
 
             // Cast the destination to an OpenWire destination, so we can
             // get all the goodies.
-            const commands::ActiveMQDestination* amqDestination =
-                dynamic_cast<const commands::ActiveMQDestination*>( destination );
-            if( amqDestination == NULL ) {
-                throw ActiveMQException( __FILE__, __LINE__,
-                    "Destination is not a valid Type: commands::ActiveMQDestination" );
-            }
+            Pointer<commands::ActiveMQDestination> amqDestination =
+                clonedDestination.dynamicCast<commands::ActiveMQDestination>();
 
             // Get any options specified in the destination and apply them to the
             // ProducerInfo object.
-            producerInfo->setDestination(
-                decaf::lang::Pointer<commands::ActiveMQDestination>(
-                    amqDestination->cloneDataStructure() ) );
+            producerInfo->setDestination( amqDestination );
             const ActiveMQProperties& options = amqDestination->getOptions();
             producerInfo->setDispatchAsync( Boolean::parseBoolean(
                 options.getProperty( "producer.dispatchAsync", "false" )) );
@@ -371,7 +377,7 @@
 
         // Create the producer instance.
         std::auto_ptr<ActiveMQProducer> producer(
-            new ActiveMQProducer( producerInfo, destination, this ) );
+            new ActiveMQProducer( producerInfo, clonedDestination, this ) );
 
         producer->setSendTimeout( this->connection->getSendTimeout() );
 
@@ -383,6 +389,8 @@
         // Add to the Connections list
         this->connection->addProducer( producer.get() );
 
+        this->syncRequest( producerInfo );
+
         return producer.release();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -600,12 +608,16 @@
         // flag.  Not adding a message ID will cause an NPE at the broker.
         decaf::lang::Pointer<commands::MessageId> id( new commands::MessageId() );
         id->setProducerId( producer->getProducerInfo().getProducerId() );
-        id->setProducerSequenceId( this->connection->getNextProducerSequenceId() );
+        id->setProducerSequenceId( this->getNextProducerSequenceId() );
 
         amqMessage->setMessageId( id );
 
         if( this->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
 
+            // Ensure that a new transaction is started if this is the first message
+            // sent since the last commit.
+            doStartTransaction();
+
             if( this->transaction.get() == NULL ) {
                 throw ActiveMQException(
                     __FILE__, __LINE__,
@@ -749,7 +761,7 @@
         consumerId->setConnectionId(
             this->connection->getConnectionId().getValue() );
         consumerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
-        consumerId->setValue( this->connection->getNextSessionId() );
+        consumerId->setValue( this->getNextConsumerId() );
 
         consumerInfo->setConsumerId( consumerId );
 
@@ -932,23 +944,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-commands::TransactionId* ActiveMQSession::createLocalTransactionId()
-    throw ( activemq::exceptions::ActiveMQException ) {
-
-    try{
-        std::auto_ptr<LocalTransactionId> id( new LocalTransactionId() );
-
-        id->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        id->setValue( this->connection->getNextTransactionId() );
-
-        return id.release();
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::oneway( Pointer<Command> command )
     throw ( activemq::exceptions::ActiveMQException ) {
 
@@ -1007,12 +1002,6 @@
                 this->connection->disposeOf( id );
                 this->consumers.remove( id );
 
-                //TODO
-//              // Remove this consumer from the Transaction if we are transacted
-//              if( transaction != NULL ) {
-//                  transaction->removeFromTransaction( consumer->getConsumerId() );
-//              }
-//
                 // Clean up any resources in the executor for this consumer
                 if( this->executor.get() != NULL ) {
 
@@ -1064,3 +1053,13 @@
     }
     return NULL;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::doStartTransaction() throw ( ActiveMQException ) {
+
+    if( !this->isTransacted() ) {
+        throw ActiveMQException( __FILE__, __LINE__, "Not a Transacted Session" );
+    }
+
+    this->transaction->begin();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Apr  1 21:49:41 2009
@@ -30,6 +30,7 @@
 #include <activemq/commands/ProducerId.h>
 #include <activemq/commands/TransactionId.h>
 #include <activemq/core/Dispatcher.h>
+#include <activemq/util/LongSequenceGenerator.h>
 
 #include <decaf/util/StlMap.h>
 #include <decaf/util/StlQueue.h>
@@ -104,6 +105,21 @@
          */
         cms::Session::AcknowledgeMode ackMode;
 
+        /**
+         * Next available Producer Id
+         */
+        util::LongSequenceGenerator producerIds;
+
+        /**
+         * Next available Producer Sequence Id
+         */
+        util::LongSequenceGenerator producerSequenceIds;
+
+        /**
+         * Next available Consumer Id
+         */
+        util::LongSequenceGenerator consumerIds;
+
     public:
 
         ActiveMQSession( const Pointer<commands::SessionInfo>& sessionInfo,
@@ -448,8 +464,41 @@
         void disposeOf( decaf::lang::Pointer<commands::ProducerId> id )
             throw ( activemq::exceptions::ActiveMQException );
 
+        /**
+         * Starts if not already start a Transaction for this Session.  If the session
+         * is not a Transacted Session then an exception is thrown.  If a transaction is
+         * already in progress then this method has no effect.
+         *
+         * @throw ActiveMQException if this is not a Transacted Session.
+         */
+        void doStartTransaction() throw ( exceptions::ActiveMQException );
+
    private:
 
+       /**
+        * Get the Next available Producer Id
+        * @return the next id in the sequence.
+        */
+       long long getNextProducerId() {
+           return this->producerIds.getNextSequenceId();
+       }
+
+       /**
+        * Get the Next available Producer Sequence Id
+        * @return the next id in the sequence.
+        */
+       long long getNextProducerSequenceId() {
+           return this->producerSequenceIds.getNextSequenceId();
+       }
+
+       /**
+        * Get the Next available Consumer Id
+        * @return the next id in the sequence.
+        */
+       long long getNextConsumerId() {
+           return this->consumerIds.getNextSequenceId();
+       }
+
        // Checks for the closed state and throws if so.
        void checkClosed() const throw( exceptions::ActiveMQException );
 
@@ -484,12 +533,6 @@
        std::string createTemporaryDestinationName()
            throw ( activemq::exceptions::ActiveMQException );
 
-       // Create a Transaction Id using the local context to create
-       // the LocalTransactionId Command.
-       // @returns a new TransactionId pointer, caller owns.
-       commands::TransactionId* createLocalTransactionId()
-           throw ( activemq::exceptions::ActiveMQException );
-
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Wed Apr  1 21:49:41 2009
@@ -22,6 +22,7 @@
 #include <activemq/commands/TransactionInfo.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
 #include <decaf/util/Iterator.h>
 
 using namespace std;
@@ -53,12 +54,10 @@
         this->session = session;
         this->connection = session->getConnection();
 
-        // convert from property Strings to int.
-        this->maxRedeliveries = Integer::parseInt(
+        maximumRedeliveries = Integer::parseInt(
             properties.getProperty( "transaction.maxRedeliveryCount", "5" ) );
-
-        // Start a new Transaction
-        this->startTransaction();
+        redeliveryDelay = Long::parseLong(
+            properties.getProperty( "transaction.redeliveryDelay", "0" ) );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -67,23 +66,10 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQTransactionContext::~ActiveMQTransactionContext() {
-
-    try{
-
-        // TODO
-        // Inform the connector we are rolling back before we close so that
-        // the provider knows we didn't complete this transaction
-//        connection->getConnectionData()->getConnector()->
-//            rollback( transactionInfo, session->getSessionInfo() );
-
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransactionContext::addSynchronization( Synchronization* sync ) {
+void ActiveMQTransactionContext::addSynchronization( const Pointer<Synchronization>& sync ) {
 
     synchronized( &this->synchronizations ) {
         this->synchronizations.add( sync );
@@ -91,7 +77,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransactionContext::removeSynchronization( Synchronization* sync ) {
+void ActiveMQTransactionContext::removeSynchronization( const Pointer<Synchronization>& sync ) {
 
     synchronized( &this->synchronizations ) {
         this->synchronizations.remove( sync );
@@ -99,56 +85,63 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::begin()
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+        if( !isInTransaction() ) {
+
+            this->synchronizations.clear();
+
+            // Create the Id
+            Pointer<LocalTransactionId> id( new LocalTransactionId() );
+            id->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+            id->setValue( this->getNextTransactionId() );
+
+            // Create and Populate the Info Command.
+            Pointer<TransactionInfo> transactionInfo( new TransactionInfo() );
+            transactionInfo->setConnectionId( id->getConnectionId() );
+            transactionInfo->setTransactionId( id );
+            transactionInfo->setType( ActiveMQConstants::TRANSACTION_STATE_BEGIN );
+
+            this->connection->oneway( transactionInfo );
+
+            this->transactionId = id.dynamicCast<TransactionId>();
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQTransactionContext::commit()
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
 
-        if( this->transactionInfo.get() == NULL ||
-            this->transactionInfo->getTransactionId() == NULL ) {
+        if( this->transactionId.get() == NULL ) {
             throw InvalidStateException(
                 __FILE__, __LINE__,
                 "ActiveMQTransactionContext::commit - "
                 "Commit called before transaction was started.");
         }
 
-        // Stop any deliveries
-        this->session->stop();
-
-        // Notify each registered Synchronization that we are committing this Transaction.
-        synchronized( &this->synchronizations ) {
+        this->beforeEnd();
 
-            std::auto_ptr< decaf::util::Iterator<Synchronization*> > iter(
-                this->synchronizations.iterator() );
+        // Create and Populate the Info Command.
+        Pointer<TransactionInfo> info( new TransactionInfo() );
+        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+        info->setTransactionId( this->transactionId );
+        info->setType( ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
 
-            while( iter->hasNext() ) {
-                iter->next()->beforeCommit();
-            }
-        }
+        // Before we send the command null the id in case of an exception.
+        this->transactionId.reset( NULL );
 
         // Commit the current Transaction
-        this->transactionInfo->setType( ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
-        this->connection->oneway( this->transactionInfo );
-
-        // Notify each registered Synchronization that we have committed this Transaction.
-        synchronized( &this->synchronizations ) {
-
-            std::auto_ptr<decaf::util::Iterator<Synchronization*> > iter(
-                this->synchronizations.iterator() );
+        this->connection->oneway( info );
 
-            while( iter->hasNext() ) {
-                iter->next()->afterCommit();
-            }
-        }
-
-        // Clear all the Synchronizations.
-        this->clearSynchronizations();
-
-        // Start a new Transaction
-        this->startTransaction();
-
-        // Stop any deliveries
-        this->session->start();
+        this->afterCommit();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -161,40 +154,28 @@
 
     try{
 
-        if( this->transactionInfo.get() == NULL ||
-            this->transactionInfo->getTransactionId() == NULL ) {
+        if( this->transactionId == NULL ) {
             throw InvalidStateException(
                 __FILE__, __LINE__,
                 "ActiveMQTransactionContext::rollback - "
                 "Rollback called before transaction was started.");
         }
 
-        // Stop any Deliveries
-        this->session->stop();
-
-        // Rollback the Transaction
-        this->transactionInfo->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
-        this->connection->oneway( this->transactionInfo );
-
-        // Notify each registered Synchronization that we are committing this Transaction.
-        synchronized( &this->synchronizations ) {
-
-            std::auto_ptr<decaf::util::Iterator<Synchronization*> > iter(
-                this->synchronizations.iterator() );
+        this->beforeEnd();
 
-            while( iter->hasNext() ) {
-                iter->next()->beforeCommit();
-            }
-        }
+        // Create and Populate the Info Command.
+        Pointer<TransactionInfo> info( new TransactionInfo() );
+        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+        info->setTransactionId( this->transactionId );
+        info->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
 
-        // Clear all the Synchronizations.
-        this->clearSynchronizations();
+        // Before we send the command null the id in case of an exception.
+        this->transactionId.reset( NULL );
 
-        // Start a new Transaction
-        this->startTransaction();
+        // Roll back the current Transaction
+        this->connection->oneway( info );
 
-        // Start Deliveries
-        this->session->start();
+        this->afterRollback();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -202,50 +183,46 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransactionContext::startTransaction() throw( activemq::exceptions::ActiveMQException ) {
-
-    try{
+void ActiveMQTransactionContext::beforeEnd() {
 
-        this->transactionInfo.reset( new TransactionInfo() );
+    // Notify each registered Synchronization that we are ending this Transaction.
+    synchronized( &this->synchronizations ) {
 
-        // Create the Id
-        Pointer<LocalTransactionId> id( new LocalTransactionId() );
-        id->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        id->setValue( this->connection->getNextTransactionId() );
-
-        // Populate the Info Command.
-        this->transactionInfo->setConnectionId( id->getConnectionId() );
-        this->transactionInfo->setTransactionId( id );
-        this->transactionInfo->setType( ActiveMQConstants::TRANSACTION_STATE_BEGIN );
+        std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
+            this->synchronizations.iterator() );
 
-        this->connection->oneway( this->transactionInfo );
+        while( iter->hasNext() ) {
+            iter->next()->beforeEnd();
+        }
     }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransactionContext::clearSynchronizations()
-    throw( activemq::exceptions::ActiveMQException ) {
+void ActiveMQTransactionContext::afterCommit() {
 
+    // Notify each registered Synchronization that we committed this Transaction.
+    synchronized( &this->synchronizations ) {
 
-    try{
+        std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
+            this->synchronizations.iterator() );
 
-        // delete each of the Synchronizations and then clear the Set.
-        synchronized( &this->synchronizations ) {
+        while( iter->hasNext() ) {
+            iter->next()->afterCommit();
+        }
+    }
+}
 
-            std::auto_ptr<decaf::util::Iterator<Synchronization*> > iter(
-                this->synchronizations.iterator() );
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransactionContext::afterRollback() {
 
-            while( iter->hasNext() ) {
-                delete iter->next();
-            }
+    // Notify each registered Synchronization that we rolled back this Transaction.
+    synchronized( &this->synchronizations ) {
 
-            this->synchronizations.clear();
+        std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
+            this->synchronizations.iterator() );
+
+        while( iter->hasNext() ) {
+            iter->next()->afterRollback();
         }
     }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h Wed Apr  1 21:49:41 2009
@@ -24,12 +24,11 @@
 
 #include <activemq/util/Config.h>
 #include <activemq/exceptions/ActiveMQException.h>
-#include <activemq/commands/TransactionInfo.h>
-#include <activemq/commands/TransactionId.h>
+#include <activemq/commands/LocalTransactionId.h>
 #include <activemq/core/Synchronization.h>
+#include <activemq/util/LongSequenceGenerator.h>
 
 #include <decaf/lang/exceptions/InvalidStateException.h>
-#include <decaf/lang/exceptions/IllegalArgumentException.h>
 #include <decaf/util/StlSet.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/Mutex.h>
@@ -65,16 +64,20 @@
         ActiveMQConnection* connection;
 
         // Transaction Info for the current Transaction
-        Pointer<commands::TransactionInfo> transactionInfo;
+        Pointer<commands::TransactionId> transactionId;
 
         // List of Registered Synchronizations
-        decaf::util::StlSet<Synchronization*> synchronizations;
+        decaf::util::StlSet< Pointer<Synchronization> > synchronizations;
 
-        // Lock object to protect the rollback Map
-        decaf::util::concurrent::Mutex mutex;
+        // Next available Transaction Id
+        util::LongSequenceGenerator transactionIds;
 
-        // Max number of redeliveries per message
-        unsigned int maxRedeliveries;
+        // Maximum number of time to redeliver a message when a Transaction is
+        // rolled back.
+        int maximumRedeliveries;
+
+        // Time to wait before starting delivery again.
+        long long redeliveryDelay;
 
     public:
 
@@ -92,57 +95,73 @@
          * Adds a Synchronization to this Transaction.
          * @param sync - The Synchronization instance to add.
          */
-        virtual void addSynchronization( Synchronization* sync );
+        virtual void addSynchronization( const Pointer<Synchronization>& sync );
 
         /**
          * Removes a Synchronization to this Transaction.
          * @param sync - The Synchronization instance to add.
          */
-        virtual void removeSynchronization( Synchronization* sync );
+        virtual void removeSynchronization( const Pointer<Synchronization>& sync );
+
+        /**
+         * Begins a new transaction if one is not currently in progress.
+         * @throw ActiveMQException
+         */
+        virtual void begin() throw ( exceptions::ActiveMQException );
 
         /**
          * Commit the current Transaction
-         * @throw CMSException
+         * @throw ActiveMQException
          */
         virtual void commit() throw ( exceptions::ActiveMQException );
 
         /**
          * Rollback the current Transaction
-         * @throw CMSException
+         * @throw ActiveMQException
          */
         virtual void rollback() throw ( exceptions::ActiveMQException );
 
         /**
-         * Get the Transaction Information object for the current
-         * Transaction, returns NULL if no transaction is running
-         * @return TransactionInfo
-         */
-        virtual const commands::TransactionInfo* getTransactionInfo() const {
-            return transactionInfo.get();
-        }
-
-        /**
          * Get the Transaction Id object for the current
          * Transaction, returns NULL if no transaction is running
          * @return TransactionInfo
+         * @throw InvalidStateException if a Transaction is not in progress.
          */
         virtual const decaf::lang::Pointer<commands::TransactionId>& getTransactionId() const {
-            if( this->transactionInfo.get() == NULL ) {
+            if( this->transactionId == NULL ) {
                 throw decaf::lang::exceptions::InvalidStateException(
                     __FILE__, __LINE__, "Transaction Not Started." );
             }
 
-            return transactionInfo->getTransactionId();
+            return transactionId;
+        }
+
+        /**
+         * Checks to see if there is currently a Transaction in progress returns
+         * false if not, true otherwise.
+         * @return true if a transaction is in progress.
+         */
+        virtual bool isInTransaction() const {
+            return this->transactionId != NULL;
+        }
+
+        int getMaximumRedeliveries() const {
+            return this->maximumRedeliveries;
+        }
+
+        long long getRedeliveryDelay() const {
+            return this->redeliveryDelay;
         }
 
     private:
 
-        // Remove and Delete all Synchronizations from the Set of registered
-        // Synchronizations in this Transaction.
-        void clearSynchronizations() throw( exceptions::ActiveMQException );
+        long long getNextTransactionId() {
+            return this->transactionIds.getNextSequenceId();
+        }
 
-        // Starts a new Transaction.
-        void startTransaction() throw( exceptions::ActiveMQException );
+        void beforeEnd();
+        void afterCommit();
+        void afterRollback();
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Synchronization.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Synchronization.h?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Synchronization.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Synchronization.h Wed Apr  1 21:49:41 2009
@@ -33,7 +33,7 @@
 
         virtual ~Synchronization() {}
 
-        virtual void beforeCommit() throw( exceptions::ActiveMQException ) = 0;
+        virtual void beforeEnd() throw( exceptions::ActiveMQException ) = 0;
 
         virtual void afterCommit() throw( exceptions::ActiveMQException ) = 0;
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=761076&r1=761075&r2=761076&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Wed Apr  1 21:49:41 2009
@@ -41,9 +41,9 @@
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleTest );
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTransactionTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTransactionTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSlowListenerTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTempDestinationTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsMessageGroupsTest );