You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/04/02 16:49:54 UTC
svn commit: r761316 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core:
ActiveMQConsumer.cpp ActiveMQConsumer.h ActiveMQSession.cpp
Author: tabish
Date: Thu Apr 2 14:49:54 2009
New Revision: 761316
URL: http://svn.apache.org/viewvc?rev=761316&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-209
Fixed the issues I was seeing at shutdown, not perfect but its a start, a real solution is going to require more redesign of the shutdown logic.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=761316&r1=761315&r2=761316&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Thu Apr 2 14:49:54 2009
@@ -49,14 +49,14 @@
namespace activemq{
namespace core {
- class ConsumerSynhcronization : public Synchronization {
+ class TransactionSynhcronization : public Synchronization {
private:
ActiveMQConsumer* consumer;
public:
- ConsumerSynhcronization( ActiveMQConsumer* consumer ) {
+ TransactionSynhcronization( ActiveMQConsumer* consumer ) {
if( consumer == NULL ) {
throw NullPointerException(
@@ -66,6 +66,8 @@
this->consumer = consumer;
}
+ virtual ~TransactionSynhcronization() {}
+
virtual void beforeEnd() throw( exceptions::ActiveMQException ) {
consumer->acknowledge();
consumer->setSynchronizationRegistered( false );
@@ -83,6 +85,38 @@
};
+ class CloseSynhcronization : public Synchronization {
+ private:
+
+ ActiveMQConsumer* consumer;
+
+ public:
+
+ CloseSynhcronization( ActiveMQConsumer* consumer ) {
+
+ if( consumer == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
+ }
+
+ this->consumer = consumer;
+ }
+
+ virtual ~CloseSynhcronization() {}
+
+ virtual void beforeEnd() throw( exceptions::ActiveMQException ) {
+ }
+
+ virtual void afterCommit() throw( exceptions::ActiveMQException ) {
+ consumer->doClose();
+ }
+
+ virtual void afterRollback() throw( exceptions::ActiveMQException ) {
+ consumer->doClose();
+ }
+
+ };
+
}}
////////////////////////////////////////////////////////////////////////////////
@@ -120,9 +154,39 @@
throw ( cms::CMSException ) {
try{
+ if( !closed ) {
+ if( this->transaction != NULL && this->transaction->isInTransaction() ) {
+
+ // TODO - Currently we can do this since the consumer could be
+ // deleted right after the close call so it won't stick around
+ // long enough to clean up the transaction data. For now we
+ // just have to close badly.
+ //
+ //Pointer<Synchronization> sync( new CloseSynhcronization( this ) );
+ //this->transaction->addSynchronization( sync );
+ doClose();
+
+ } else {
+ doClose();
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::doClose() throw ( ActiveMQException ) {
+
+ try {
if( !this->isClosed() ) {
+ if( !session->isTransacted() ) {
+ deliverAcks();
+ }
+
// Remove this Consumer from the Connections set of Dispatchers and then
// remove it from the Broker.
this->session->disposeOf( this->consumerInfo->getConsumerId() );
@@ -461,6 +525,47 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::deliverAcks()
+ throw ( ActiveMQException ) {
+
+ try{
+
+ Pointer<MessageAck> ack;
+
+ if( this->deliveringAcks.compareAndSet( false, true ) ) {
+
+ if( this->session->isAutoAcknowledge() ) {
+
+ synchronized( &dispatchedMessages ) {
+ ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
+ if( ack != NULL ) {
+ dispatchedMessages.clear();
+ }
+ }
+
+ } else if( pendingAck != NULL &&
+ pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED ) {
+
+ ack = pendingAck;
+ }
+
+ if( ack != NULL ) {
+
+ try{
+ this->session->oneway( ack );
+ } catch(...) {}
+
+ } else {
+ this->deliveringAcks.set( false );
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::ackLater( const Pointer<Message>& message, int ackType )
throw ( ActiveMQException ) {
@@ -471,7 +576,7 @@
if( !synchronizationRegistered ) {
synchronizationRegistered = true;
- Pointer<Synchronization> sync( new ConsumerSynhcronization( this ) );
+ Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
this->transaction->addSynchronization( sync );
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=761316&r1=761315&r2=761316&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Thu Apr 2 14:49:54 2009
@@ -229,6 +229,12 @@
void rollback() throw ( exceptions::ActiveMQException );
/**
+ * Performs the actual close operation on this consumer
+ * @throw ActiveMQException
+ */
+ void doClose() throw ( exceptions::ActiveMQException );
+
+ /**
* Get the Consumer information for this consumer
* @return Reference to a Consumer Info Object
*/
@@ -331,6 +337,9 @@
void ackLater( const Pointer<commands::Message>& message, int ackType )
throw ( exceptions::ActiveMQException );
+ // Delivers all pending acks before a consumer is closed
+ void deliverAcks() throw ( exceptions::ActiveMQException );
+
// Create an Ack Message that acks all messages that have been delivered so far.
Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=761316&r1=761315&r2=761316&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu Apr 2 14:49:54 2009
@@ -116,10 +116,10 @@
// Stop the dispatch executor.
stop();
- // TODO = Commit it first. ??
- // Destroy the Transaction
+ // Roll Back the transaction since we were closed without an explicit call
+ // to commit it.
if( this->transaction.get() != NULL && this->transaction->isInTransaction() ){
- this->transaction->commit();
+ this->transaction->rollback();
}
// Close all Consumers