You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/06/07 20:57:39 UTC
svn commit: r952365 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq:
core/ActiveMQConnection.cpp core/ActiveMQConsumer.cpp
core/ActiveMQConsumer.h core/ActiveMQSession.cpp
transport/failover/FailoverTransport.cpp
Author: tabish
Date: Mon Jun 7 18:57:39 2010
New Revision: 952365
URL: http://svn.apache.org/viewvc?rev=952365&view=rev
Log:
Fixes for consumer message purge during failover processing.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Jun 7 18:57:39 2010
@@ -32,6 +32,7 @@
#include <decaf/lang/Math.h>
#include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
#include <decaf/util/Iterator.h>
#include <decaf/util/UUID.h>
#include <decaf/util/concurrent/Mutex.h>
@@ -877,13 +878,15 @@ void ActiveMQConnection::removeTransport
void ActiveMQConnection::waitForTransportInterruptionProcessingToComplete()
throw( decaf::lang::exceptions::InterruptedException ) {
- if( this->config->transportInterruptionProcessingComplete != NULL ) {
+ Pointer<CountDownLatch> cdl = this->config->transportInterruptionProcessingComplete;
+ if( cdl != NULL ) {
- while( !closed.get() && !transportFailed.get() &&
- !this->config->transportInterruptionProcessingComplete->await( 15, TimeUnit::SECONDS) ) {
+ while( !closed.get() && !transportFailed.get() && cdl->getCount() > 0 ) {
- //LOG.warn( "dispatch paused, waiting for outstanding dispatch interruption processing (" +
- // transportInterruptionProcessingComplete.getCount() + ") to complete..");
+ std::cout << "dispatch paused, waiting for outstanding dispatch interruption processing ("
+ << Integer::toString( cdl->getCount() ) << ") to complete.." << std::endl;
+
+ cdl->await( 10, TimeUnit::SECONDS );
}
signalInterruptionProcessingComplete();
@@ -893,15 +896,15 @@ void ActiveMQConnection::waitForTranspor
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setTransportInterruptionProcessingComplete() {
- synchronized( &( this->config->mutex ) ) {
+ Pointer<CountDownLatch> cdl = this->config->transportInterruptionProcessingComplete;
+ if( cdl != NULL ) {
- if( this->config->transportInterruptionProcessingComplete != NULL ) {
- this->config->transportInterruptionProcessingComplete->countDown();
+ std::cout << "Set Transport interruption processing complete." << std::endl;
+ cdl->countDown();
- try {
- signalInterruptionProcessingComplete();
- } catch( InterruptedException& ignored ) {}
- }
+ try {
+ signalInterruptionProcessingComplete();
+ } catch( InterruptedException& ignored ) {}
}
}
@@ -909,21 +912,23 @@ void ActiveMQConnection::setTransportInt
void ActiveMQConnection::signalInterruptionProcessingComplete()
throw( decaf::lang::exceptions::InterruptedException ) {
- if( this->config->transportInterruptionProcessingComplete->await( 0, TimeUnit::SECONDS ) ) {
- synchronized( &( this->config->mutex ) ) {
+ Pointer<CountDownLatch> cdl = this->config->transportInterruptionProcessingComplete;
- this->config->transportInterruptionProcessingComplete.reset( NULL );
- FailoverTransport* failoverTransport =
- dynamic_cast<FailoverTransport*>( this->config->transport->narrow( typeid( FailoverTransport ) ) );
-
- if( failoverTransport != NULL ) {
- failoverTransport->setConnectionInterruptProcessingComplete(
- this->config->connectionInfo->getConnectionId() );
-
- //if( LOG.isDebugEnabled() ) {
- // LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
- //}
- }
+ if( cdl->getCount() == 0 ) {
+
+ std::cout << "Signaling Transport interruption processing complete." << std::endl;
+
+ this->config->transportInterruptionProcessingComplete.reset( NULL );
+ FailoverTransport* failoverTransport =
+ dynamic_cast<FailoverTransport*>( this->config->transport->narrow( typeid( FailoverTransport ) ) );
+
+ if( failoverTransport != NULL ) {
+ failoverTransport->setConnectionInterruptProcessingComplete(
+ this->config->connectionInfo->getConnectionId() );
+
+ //if( LOG.isDebugEnabled() ) {
+ // LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
+ //}
}
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Jun 7 18:57:39 2010
@@ -242,6 +242,7 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
this->additionalWindowSize = 0;
this->deliveredCounter = 0;
this->clearDispatchList = false;
+ this->inProgressClearRequiredFlag = false;
this->listener = NULL;
this->redeliveryDelay = 0;
this->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone() );
@@ -985,6 +986,7 @@ void ActiveMQConsumer::dispatch( const P
synchronized( &unconsumedMessages ) {
+ clearMessagesInProgress();
if( this->clearDispatchList ) {
// we are reconnecting so lets flush the in progress
// messages
@@ -1095,18 +1097,30 @@ bool ActiveMQConsumer::iterate() {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::clearMessagesInProgress() {
- // we are called from inside the transport reconnection logic
- // which involves us clearing all the connections' consumers
- // dispatch lists and clearing them
- // so rather than trying to grab a mutex (which could be already
- // owned by the message listener calling the send) we will just set
- // a flag so that the list can be cleared as soon as the
- // dispatch thread is ready to flush the dispatch list
+void ActiveMQConsumer::inProgressClearRequired() {
+
+ inProgressClearRequiredFlag = true;
+ // Clears dispatched messages async to avoid lock contention with inprogress acks.
clearDispatchList = true;
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::clearMessagesInProgress() {
+ if( inProgressClearRequiredFlag ) {
+ synchronized( &unconsumedMessages ) {
+ if( inProgressClearRequiredFlag ) {
+
+ // TODO - Rollback duplicates.
+
+ // allow dispatch on this connection to resume
+ this->session->getConnection()->setTransportInterruptionProcessingComplete();
+ inProgressClearRequiredFlag = false;
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::isAutoAcknowledgeEach() const {
return this->session->isAutoAcknowledge() ||
( this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue() );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Jun 7 18:57:39 2010
@@ -122,6 +122,11 @@ namespace core{
bool clearDispatchList;
/**
+ * Indicates if inprogress messages are to be cleared.
+ */
+ bool inProgressClearRequiredFlag;
+
+ /**
* The redelivery delay used for the last set of redeliveries.
*/
long long redeliveryDelay;
@@ -327,6 +332,12 @@ namespace core{
void clearMessagesInProgress();
/**
+ * Signals that a Failure occurred and that anything in-progress in the
+ * consumer should be cleared.
+ */
+ void inProgressClearRequired();
+
+ /**
* Gets the currently set Last Delivered Sequence Id
*
* @returns long long containing the sequence id of the last delivered Message.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Mon Jun 7 18:57:39 2010
@@ -234,6 +234,8 @@ void ActiveMQSession::clearMessagesInPro
std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
for( ; iter != consumers.end(); ++iter ) {
+ (*iter)->inProgressClearRequired();
+ // Todo - This should occur asynchronously.
(*iter)->clearMessagesInProgress();
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=952365&r1=952364&r2=952365&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Jun 7 18:57:39 2010
@@ -467,15 +467,15 @@ void FailoverTransport::handleTransportF
connectedTransportURI.reset( NULL );
connected = false;
+ // Place the State Tracker into a reconnection state.
+ this->stateTracker.transportInterrupted();
+
// Notify before we attempt to reconnect so that the consumers have a chance
// to cleanup their state.
if( transportListener != NULL ) {
transportListener->transportInterrupted();
}
- // Place the State Tracker into a reconnection state.
- this->stateTracker.transportInterrupted();
-
if( started ) {
taskRunner->wakeup();
}