You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/30 17:24:54 UTC
svn commit: r760007 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core:
ActiveMQConsumer.cpp ActiveMQConsumer.h ActiveMQSession.cpp ActiveMQSession.h
Author: tabish
Date: Mon Mar 30 15:24:54 2009
New Revision: 760007
URL: http://svn.apache.org/viewvc?rev=760007&view=rev
Log:
Tweak the Session and Consumer objects to increase thread safety and prep for fixes for the Transactions support that is currently broken.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=760007&r1=760006&r2=760007&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Mar 30 15:24:54 2009
@@ -46,7 +46,7 @@
////////////////////////////////////////////////////////////////////////////////
ActiveMQConsumer::ActiveMQConsumer( const Pointer<ConsumerInfo>& consumerInfo,
ActiveMQSession* session,
- ActiveMQTransactionContext* transaction ) {
+ const Pointer<ActiveMQTransactionContext>& transaction ) {
if( session == NULL || consumerInfo == NULL ) {
throw ActiveMQException(
@@ -58,8 +58,8 @@
this->session = session;
this->transaction = transaction;
this->consumerInfo = consumerInfo;
- this->listener = NULL;
this->closed = false;
+ this->lastDeliveredSequenceId = 0;
}
////////////////////////////////////////////////////////////////////////////////
@@ -312,9 +312,13 @@
this->checkClosed();
- this->listener = listener;
+ if( this->consumerInfo->getPrefetchSize() == 0 && listener != NULL ) {
+ throw ActiveMQException(
+ __FILE__, __LINE__,
+ "Cannot deliver async when Prefetch is Zero, set Prefecth to at least One.");
+ }
- if( listener != NULL && session != NULL ) {
+ if( listener != NULL ) {
// Now that we have a valid message listener,
// redispatch all the messages that it missed.
@@ -324,11 +328,15 @@
session->stop();
}
+ this->listener.set( listener );
+
session->redispatch( unconsumedMessages );
if( wasStarted ) {
session->start();
}
+ } else {
+ this->listener.set( NULL );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -349,6 +357,8 @@
message->setAckHandler( this );
}
+ this->lastDeliveredSequenceId = message->getMessageId()->getBrokerSequenceId();
+
// If the session is transacted then we hand off the message to it to
// be stored for later redelivery. We do need to check and see if we
// are approaching the prefetch limit and send an Delivered ack just so
@@ -460,13 +470,13 @@
}
// If we have a listener, send the message.
- if( listener != NULL ) {
+ if( this->listener.get() != NULL ) {
// Preprocessing.
beforeMessageIsConsumed( message );
// Notify the listener
- listener->onMessage( dynamic_cast<cms::Message*>( message.get() ) );
+ this->listener.get()->onMessage( dynamic_cast<cms::Message*>( message.get() ) );
// Postprocessing
afterMessageIsConsumed( message, false );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=760007&r1=760006&r2=760007&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Mar 30 15:24:54 2009
@@ -26,8 +26,10 @@
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/ConsumerInfo.h>
#include <activemq/core/ActiveMQAckHandler.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
#include <activemq/core/Dispatcher.h>
+#include <decaf/util/concurrent/atomic/AtomicReference.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/StlQueue.h>
#include <decaf/util/concurrent/Mutex.h>
@@ -37,9 +39,9 @@
namespace core{
using decaf::lang::Pointer;
+ using decaf::util::concurrent::atomic::AtomicReference;
class ActiveMQSession;
- class ActiveMQTransactionContext;
class AMQCPP_API ActiveMQConsumer :
public cms::MessageConsumer,
@@ -56,7 +58,7 @@
/**
* The Transaction Context, null if not in a Transacted Session.
*/
- ActiveMQTransactionContext* transaction;
+ Pointer<ActiveMQTransactionContext> transaction;
/**
* The Consumer info for this Consumer
@@ -66,7 +68,7 @@
/**
* The Message Listener for this Consumer
*/
- cms::MessageListener* listener;
+ AtomicReference<cms::MessageListener> listener;
/**
* Queue of unconsumed messages.
@@ -79,6 +81,11 @@
decaf::util::StlQueue< decaf::lang::Pointer<commands::Message> > dispatchedMessages;
/**
+ * The last delivered message's BrokerSequenceId.
+ */
+ long long lastDeliveredSequenceId;
+
+ /**
* Boolean that indicates if the consumer has been closed
*/
bool closed;
@@ -90,7 +97,7 @@
*/
ActiveMQConsumer( const Pointer<commands::ConsumerInfo>& consumerInfo,
ActiveMQSession* session,
- ActiveMQTransactionContext* transaction );
+ const Pointer<ActiveMQTransactionContext>& transaction );
virtual ~ActiveMQConsumer();
@@ -140,7 +147,7 @@
* @param MessageListener interface pointer
*/
virtual cms::MessageListener* getMessageListener() const {
- return this->listener;
+ return this->listener.get();
}
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=760007&r1=760006&r2=760007&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Mon Mar 30 15:24:54 2009
@@ -266,7 +266,7 @@
// Create the consumer instance.
std::auto_ptr<ActiveMQConsumer> consumer(
- new ActiveMQConsumer( consumerInfo, this, this->transaction.get() ) );
+ new ActiveMQConsumer( consumerInfo, this, this->transaction ) );
// Add the consumer to the map.
synchronized( &this->consumers ) {
@@ -311,7 +311,7 @@
// Create the consumer instance.
std::auto_ptr<ActiveMQConsumer> consumer(
- new ActiveMQConsumer( consumerInfo, this, this->transaction.get() ) );
+ new ActiveMQConsumer( consumerInfo, this, this->transaction ) );
// Add the consumer to the map.
synchronized( &this->consumers ) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=760007&r1=760006&r2=760007&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Mon Mar 30 15:24:54 2009
@@ -72,7 +72,7 @@
/**
* Transaction Management object
*/
- std::auto_ptr<ActiveMQTransactionContext> transaction;
+ Pointer<ActiveMQTransactionContext> transaction;
/**
* Connection