You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/04/02 16:49:54 UTC

svn commit: r761316 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConsumer.cpp ActiveMQConsumer.h ActiveMQSession.cpp

Author: tabish
Date: Thu Apr  2 14:49:54 2009
New Revision: 761316

URL: http://svn.apache.org/viewvc?rev=761316&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-209

Fixed the issues I was seeing at shutdown, not perfect but its a start, a real solution is going to require more redesign of the shutdown logic.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=761316&r1=761315&r2=761316&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Thu Apr  2 14:49:54 2009
@@ -49,14 +49,14 @@
 namespace activemq{
 namespace core {
 
-    class ConsumerSynhcronization : public Synchronization {
+    class TransactionSynhcronization : public Synchronization {
     private:
 
         ActiveMQConsumer* consumer;
 
     public:
 
-        ConsumerSynhcronization( ActiveMQConsumer* consumer ) {
+        TransactionSynhcronization( ActiveMQConsumer* consumer ) {
 
             if( consumer == NULL ) {
                 throw NullPointerException(
@@ -66,6 +66,8 @@
             this->consumer = consumer;
         }
 
+        virtual ~TransactionSynhcronization() {}
+
         virtual void beforeEnd() throw( exceptions::ActiveMQException ) {
             consumer->acknowledge();
             consumer->setSynchronizationRegistered( false );
@@ -83,6 +85,38 @@
 
     };
 
+    class CloseSynhcronization : public Synchronization {
+    private:
+
+        ActiveMQConsumer* consumer;
+
+    public:
+
+        CloseSynhcronization( ActiveMQConsumer* consumer ) {
+
+            if( consumer == NULL ) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
+            }
+
+            this->consumer = consumer;
+        }
+
+        virtual ~CloseSynhcronization() {}
+
+        virtual void beforeEnd() throw( exceptions::ActiveMQException ) {
+        }
+
+        virtual void afterCommit() throw( exceptions::ActiveMQException ) {
+            consumer->doClose();
+        }
+
+        virtual void afterRollback() throw( exceptions::ActiveMQException ) {
+            consumer->doClose();
+        }
+
+    };
+
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -120,9 +154,39 @@
     throw ( cms::CMSException ) {
 
     try{
+        if( !closed ) {
+            if( this->transaction != NULL && this->transaction->isInTransaction() ) {
+
+                // TODO - Currently we can do this since the consumer could be
+                // deleted right after the close call so it won't stick around
+                // long enough to clean up the transaction data.  For now we
+                // just have to close badly.
+                //
+                //Pointer<Synchronization> sync( new CloseSynhcronization( this ) );
+                //this->transaction->addSynchronization( sync );
+                doClose();
+
+            } else {
+                doClose();
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::doClose() throw ( ActiveMQException ) {
+
+    try {
 
         if( !this->isClosed() ) {
 
+            if( !session->isTransacted() ) {
+                deliverAcks();
+            }
+
             // Remove this Consumer from the Connections set of Dispatchers and then
             // remove it from the Broker.
             this->session->disposeOf( this->consumerInfo->getConsumerId() );
@@ -461,6 +525,47 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::deliverAcks()
+    throw ( ActiveMQException ) {
+
+    try{
+
+        Pointer<MessageAck> ack;
+
+        if( this->deliveringAcks.compareAndSet( false, true ) ) {
+
+            if( this->session->isAutoAcknowledge() ) {
+
+                synchronized( &dispatchedMessages ) {
+                    ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
+                    if( ack != NULL ) {
+                        dispatchedMessages.clear();
+                    }
+                }
+
+            } else if( pendingAck != NULL &&
+                       pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED ) {
+
+                ack = pendingAck;
+            }
+
+            if( ack != NULL ) {
+
+                try{
+                    this->session->oneway( ack );
+                } catch(...) {}
+
+            } else {
+                this->deliveringAcks.set( false );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::ackLater( const Pointer<Message>& message, int ackType )
     throw ( ActiveMQException ) {
 
@@ -471,7 +576,7 @@
         if( !synchronizationRegistered ) {
             synchronizationRegistered = true;
 
-            Pointer<Synchronization> sync( new ConsumerSynhcronization( this ) );
+            Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
             this->transaction->addSynchronization( sync );
         }
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=761316&r1=761315&r2=761316&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Thu Apr  2 14:49:54 2009
@@ -229,6 +229,12 @@
         void rollback() throw ( exceptions::ActiveMQException );
 
         /**
+         * Performs the actual close operation on this consumer
+         * @throw ActiveMQException
+         */
+        void doClose() throw ( exceptions::ActiveMQException );
+
+        /**
          * Get the Consumer information for this consumer
          * @return Reference to a Consumer Info Object
          */
@@ -331,6 +337,9 @@
         void ackLater( const Pointer<commands::Message>& message, int ackType )
             throw ( exceptions::ActiveMQException );
 
+        // Delivers all pending acks before a consumer is closed
+        void deliverAcks() throw ( exceptions::ActiveMQException );
+
         // Create an Ack Message that acks all messages that have been delivered so far.
         Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=761316&r1=761315&r2=761316&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu Apr  2 14:49:54 2009
@@ -116,10 +116,10 @@
         // Stop the dispatch executor.
         stop();
 
-        // TODO = Commit it first.  ??
-        // Destroy the Transaction
+        // Roll Back the transaction since we were closed without an explicit call
+        // to commit it.
         if( this->transaction.get() != NULL && this->transaction->isInTransaction() ){
-            this->transaction->commit();
+            this->transaction->rollback();
         }
 
         // Close all Consumers