You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/06/07 20:57:39 UTC

svn commit: r952365 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: core/ActiveMQConnection.cpp core/ActiveMQConsumer.cpp core/ActiveMQConsumer.h core/ActiveMQSession.cpp transport/failover/FailoverTransport.cpp

Author: tabish
Date: Mon Jun  7 18:57:39 2010
New Revision: 952365

URL: http://svn.apache.org/viewvc?rev=952365&view=rev
Log:
Fixes for consumer message purge during failover processing.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Jun  7 18:57:39 2010
@@ -32,6 +32,7 @@
 
 #include <decaf/lang/Math.h>
 #include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
 #include <decaf/util/Iterator.h>
 #include <decaf/util/UUID.h>
 #include <decaf/util/concurrent/Mutex.h>
@@ -877,13 +878,15 @@ void ActiveMQConnection::removeTransport
 void ActiveMQConnection::waitForTransportInterruptionProcessingToComplete()
     throw( decaf::lang::exceptions::InterruptedException ) {
 
-    if( this->config->transportInterruptionProcessingComplete != NULL ) {
+    Pointer<CountDownLatch> cdl = this->config->transportInterruptionProcessingComplete;
+    if( cdl != NULL ) {
 
-        while( !closed.get() && !transportFailed.get() &&
-               !this->config->transportInterruptionProcessingComplete->await( 15, TimeUnit::SECONDS) ) {
+        while( !closed.get() && !transportFailed.get() && cdl->getCount() > 0 ) {
 
-            //LOG.warn( "dispatch paused, waiting for outstanding dispatch interruption processing (" +
-            //          transportInterruptionProcessingComplete.getCount() + ") to complete..");
+            std::cout << "dispatch paused, waiting for outstanding dispatch interruption processing ("
+                      << Integer::toString( cdl->getCount() ) << ") to complete.." << std::endl;
+
+            cdl->await( 10, TimeUnit::SECONDS );
         }
 
         signalInterruptionProcessingComplete();
@@ -893,15 +896,15 @@ void ActiveMQConnection::waitForTranspor
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::setTransportInterruptionProcessingComplete() {
 
-    synchronized( &( this->config->mutex ) ) {
+    Pointer<CountDownLatch> cdl = this->config->transportInterruptionProcessingComplete;
+    if( cdl != NULL ) {
 
-        if( this->config->transportInterruptionProcessingComplete != NULL ) {
-            this->config->transportInterruptionProcessingComplete->countDown();
+        std::cout << "Set Transport interruption processing complete." << std::endl;
+        cdl->countDown();
 
-            try {
-                signalInterruptionProcessingComplete();
-            } catch( InterruptedException& ignored ) {}
-        }
+        try {
+            signalInterruptionProcessingComplete();
+        } catch( InterruptedException& ignored ) {}
     }
 }
 
@@ -909,21 +912,23 @@ void ActiveMQConnection::setTransportInt
 void ActiveMQConnection::signalInterruptionProcessingComplete()
     throw( decaf::lang::exceptions::InterruptedException ) {
 
-    if( this->config->transportInterruptionProcessingComplete->await( 0, TimeUnit::SECONDS ) ) {
-        synchronized( &( this->config->mutex ) ) {
+    Pointer<CountDownLatch> cdl = this->config->transportInterruptionProcessingComplete;
 
-            this->config->transportInterruptionProcessingComplete.reset( NULL );
-            FailoverTransport* failoverTransport =
-                dynamic_cast<FailoverTransport*>( this->config->transport->narrow( typeid( FailoverTransport ) ) );
-
-            if( failoverTransport != NULL ) {
-                failoverTransport->setConnectionInterruptProcessingComplete(
-                    this->config->connectionInfo->getConnectionId() );
-
-                //if( LOG.isDebugEnabled() ) {
-                //    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
-                //}
-            }
+    if( cdl->getCount() == 0 ) {
+
+        std::cout << "Signaling Transport interruption processing complete." << std::endl;
+
+        this->config->transportInterruptionProcessingComplete.reset( NULL );
+        FailoverTransport* failoverTransport =
+            dynamic_cast<FailoverTransport*>( this->config->transport->narrow( typeid( FailoverTransport ) ) );
+
+        if( failoverTransport != NULL ) {
+            failoverTransport->setConnectionInterruptProcessingComplete(
+                this->config->connectionInfo->getConnectionId() );
+
+            //if( LOG.isDebugEnabled() ) {
+            //    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
+            //}
         }
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Jun  7 18:57:39 2010
@@ -242,6 +242,7 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
     this->additionalWindowSize = 0;
     this->deliveredCounter = 0;
     this->clearDispatchList = false;
+    this->inProgressClearRequiredFlag = false;
     this->listener = NULL;
     this->redeliveryDelay = 0;
     this->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone() );
@@ -985,6 +986,7 @@ void ActiveMQConsumer::dispatch( const P
 
         synchronized( &unconsumedMessages ) {
 
+            clearMessagesInProgress();
             if( this->clearDispatchList ) {
                 // we are reconnecting so lets flush the in progress
                 // messages
@@ -1095,18 +1097,30 @@ bool ActiveMQConsumer::iterate() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::clearMessagesInProgress() {
-    // we are called from inside the transport reconnection logic
-    // which involves us clearing all the connections' consumers
-    // dispatch lists and clearing them
-    // so rather than trying to grab a mutex (which could be already
-    // owned by the message listener calling the send) we will just set
-    // a flag so that the list can be cleared as soon as the
-    // dispatch thread is ready to flush the dispatch list
+void ActiveMQConsumer::inProgressClearRequired() {
+
+    inProgressClearRequiredFlag = true;
+    // Clears dispatched messages async to avoid lock contention with inprogress acks.
     clearDispatchList = true;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::clearMessagesInProgress() {
+    if( inProgressClearRequiredFlag ) {
+        synchronized( &unconsumedMessages ) {
+            if( inProgressClearRequiredFlag ) {
+
+                // TODO - Rollback duplicates.
+
+                // allow dispatch on this connection to resume
+                this->session->getConnection()->setTransportInterruptionProcessingComplete();
+                inProgressClearRequiredFlag = false;
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConsumer::isAutoAcknowledgeEach() const {
     return this->session->isAutoAcknowledge() ||
            ( this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue() );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Jun  7 18:57:39 2010
@@ -122,6 +122,11 @@ namespace core{
         bool clearDispatchList;
 
         /**
+         * Indicates if inprogress messages are to be cleared.
+         */
+        bool inProgressClearRequiredFlag;
+
+        /**
          * The redelivery delay used for the last set of redeliveries.
          */
         long long redeliveryDelay;
@@ -327,6 +332,12 @@ namespace core{
         void clearMessagesInProgress();
 
         /**
+         * Signals that a Failure occurred and that anything in-progress in the
+         * consumer should be cleared.
+         */
+        void inProgressClearRequired();
+
+        /**
          * Gets the currently set Last Delivered Sequence Id
          *
          * @returns long long containing the sequence id of the last delivered Message.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Mon Jun  7 18:57:39 2010
@@ -234,6 +234,8 @@ void ActiveMQSession::clearMessagesInPro
 
         std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
         for( ; iter != consumers.end(); ++iter ) {
+            (*iter)->inProgressClearRequired();
+            // Todo - This should occur asynchronously.
             (*iter)->clearMessagesInProgress();
         }
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Jun  7 18:57:39 2010
@@ -467,15 +467,15 @@ void FailoverTransport::handleTransportF
             connectedTransportURI.reset( NULL );
             connected = false;
 
+            // Place the State Tracker into a reconnection state.
+            this->stateTracker.transportInterrupted();
+
             // Notify before we attempt to reconnect so that the consumers have a chance
             // to cleanup their state.
             if( transportListener != NULL ) {
                 transportListener->transportInterrupted();
             }
 
-            // Place the State Tracker into a reconnection state.
-            this->stateTracker.transportInterrupted();
-
             if( started ) {
                 taskRunner->wakeup();
             }