You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/30 17:24:54 UTC

svn commit: r760007 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConsumer.cpp ActiveMQConsumer.h ActiveMQSession.cpp ActiveMQSession.h

Author: tabish
Date: Mon Mar 30 15:24:54 2009
New Revision: 760007

URL: http://svn.apache.org/viewvc?rev=760007&view=rev
Log:
Tweak the Session and Consumer objects to increase thread safety and prep for fixes for the Transactions support that is currently broken.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=760007&r1=760006&r2=760007&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Mar 30 15:24:54 2009
@@ -46,7 +46,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConsumer::ActiveMQConsumer( const Pointer<ConsumerInfo>& consumerInfo,
                                     ActiveMQSession* session,
-                                    ActiveMQTransactionContext* transaction ) {
+                                    const Pointer<ActiveMQTransactionContext>& transaction ) {
 
     if( session == NULL || consumerInfo == NULL ) {
         throw ActiveMQException(
@@ -58,8 +58,8 @@
     this->session = session;
     this->transaction = transaction;
     this->consumerInfo = consumerInfo;
-    this->listener = NULL;
     this->closed = false;
+    this->lastDeliveredSequenceId = 0;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -312,9 +312,13 @@
 
         this->checkClosed();
 
-        this->listener = listener;
+        if( this->consumerInfo->getPrefetchSize() == 0 && listener != NULL ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "Cannot deliver async when Prefetch is Zero, set Prefecth to at least One.");
+        }
 
-        if( listener != NULL && session != NULL ) {
+        if( listener != NULL ) {
 
             // Now that we have a valid message listener,
             // redispatch all the messages that it missed.
@@ -324,11 +328,15 @@
                 session->stop();
             }
 
+            this->listener.set( listener );
+
             session->redispatch( unconsumedMessages );
 
             if( wasStarted ) {
                 session->start();
             }
+        } else {
+            this->listener.set( NULL );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -349,6 +357,8 @@
         message->setAckHandler( this );
     }
 
+    this->lastDeliveredSequenceId = message->getMessageId()->getBrokerSequenceId();
+
     // If the session is transacted then we hand off the message to it to
     // be stored for later redelivery.  We do need to check and see if we
     // are approaching the prefetch limit and send an Delivered ack just so
@@ -460,13 +470,13 @@
         }
 
         // If we have a listener, send the message.
-        if( listener != NULL ) {
+        if( this->listener.get() != NULL ) {
 
             // Preprocessing.
             beforeMessageIsConsumed( message );
 
             // Notify the listener
-            listener->onMessage( dynamic_cast<cms::Message*>( message.get() ) );
+            this->listener.get()->onMessage( dynamic_cast<cms::Message*>( message.get() ) );
 
             // Postprocessing
             afterMessageIsConsumed( message, false );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=760007&r1=760006&r2=760007&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Mar 30 15:24:54 2009
@@ -26,8 +26,10 @@
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/ConsumerInfo.h>
 #include <activemq/core/ActiveMQAckHandler.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/Dispatcher.h>
 
+#include <decaf/util/concurrent/atomic/AtomicReference.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/StlQueue.h>
 #include <decaf/util/concurrent/Mutex.h>
@@ -37,9 +39,9 @@
 namespace core{
 
     using decaf::lang::Pointer;
+    using decaf::util::concurrent::atomic::AtomicReference;
 
     class ActiveMQSession;
-    class ActiveMQTransactionContext;
 
     class AMQCPP_API ActiveMQConsumer :
         public cms::MessageConsumer,
@@ -56,7 +58,7 @@
         /**
          * The Transaction Context, null if not in a Transacted Session.
          */
-        ActiveMQTransactionContext* transaction;
+        Pointer<ActiveMQTransactionContext> transaction;
 
         /**
          * The Consumer info for this Consumer
@@ -66,7 +68,7 @@
         /**
          * The Message Listener for this Consumer
          */
-        cms::MessageListener* listener;
+        AtomicReference<cms::MessageListener> listener;
 
         /**
          * Queue of unconsumed messages.
@@ -79,6 +81,11 @@
         decaf::util::StlQueue< decaf::lang::Pointer<commands::Message> > dispatchedMessages;
 
         /**
+         * The last delivered message's BrokerSequenceId.
+         */
+        long long lastDeliveredSequenceId;
+
+        /**
          * Boolean that indicates if the consumer has been closed
          */
         bool closed;
@@ -90,7 +97,7 @@
          */
         ActiveMQConsumer( const Pointer<commands::ConsumerInfo>& consumerInfo,
                           ActiveMQSession* session,
-                          ActiveMQTransactionContext* transaction );
+                          const Pointer<ActiveMQTransactionContext>& transaction );
 
         virtual ~ActiveMQConsumer();
 
@@ -140,7 +147,7 @@
          * @param MessageListener interface pointer
          */
         virtual cms::MessageListener* getMessageListener() const {
-            return this->listener;
+            return this->listener.get();
         }
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=760007&r1=760006&r2=760007&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Mon Mar 30 15:24:54 2009
@@ -266,7 +266,7 @@
 
         // Create the consumer instance.
         std::auto_ptr<ActiveMQConsumer> consumer(
-            new ActiveMQConsumer( consumerInfo, this, this->transaction.get() ) );
+            new ActiveMQConsumer( consumerInfo, this, this->transaction ) );
 
         // Add the consumer to the map.
         synchronized( &this->consumers ) {
@@ -311,7 +311,7 @@
 
         // Create the consumer instance.
         std::auto_ptr<ActiveMQConsumer> consumer(
-            new ActiveMQConsumer( consumerInfo, this, this->transaction.get() ) );
+            new ActiveMQConsumer( consumerInfo, this, this->transaction ) );
 
         // Add the consumer to the map.
         synchronized( &this->consumers ) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=760007&r1=760006&r2=760007&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Mon Mar 30 15:24:54 2009
@@ -72,7 +72,7 @@
         /**
          * Transaction Management object
          */
-        std::auto_ptr<ActiveMQTransactionContext> transaction;
+        Pointer<ActiveMQTransactionContext> transaction;
 
         /**
          * Connection