You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/18 20:40:53 UTC

svn commit: r519679 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConnection.cpp ActiveMQConnection.h ActiveMQSession.cpp ActiveMQSession.h ActiveMQSessionExecutor.cpp

Author: nmittler
Date: Sun Mar 18 12:40:52 2007
New Revision: 519679

URL: http://svn.apache.org/viewvc?view=rev&rev=519679
Log:
AMQCPP-90 - simplifying the execution of session threads

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Sun Mar 18 12:40:52 2007
@@ -39,9 +39,6 @@
     this->closed = false;
     this->exceptionListener = NULL;
 
-    alwaysSessionAsync = Boolean::parseBoolean(
-        connectionData->getProperties().getProperty( "alwaysSessionAsync", "true" ) );
-
     // Register for messages and exceptions from the connector.
     Connector* connector = connectionData->getConnector();
     connector->setConsumerMessageListener( this );
@@ -97,16 +94,12 @@
 {
     try
     {
-        // Determine whether or not to make dispatch for this session asynchronous
-        bool doSessionAsync = alwaysSessionAsync || !activeSessions.isEmpty() ||
-            ackMode==Session::SESSION_TRANSACTED || ackMode==Session::CLIENT_ACKNOWLEDGE;
 
         // Create the session instance.
         ActiveMQSession* session = new ActiveMQSession(
             connectionData->getConnector()->createSession( ackMode ),
             connectionData->getProperties(),
-            this,
-            doSessionAsync );
+            this );
 
         // Add the session to the set of active sessions.
         synchronized( &activeSessions ) {
@@ -224,28 +217,18 @@
         synchronized( &dispatchers )
         {
             dispatcher = dispatchers.getValue(consumer->getConsumerId());
-        }
         
-        // If we have no registered dispatcher, this is bad!! (should never happen)
-        if( dispatcher == NULL )
-        {
-            // Indicate to Broker that we received the message, but it
-            // was not consumed.
-            connectionData->getConnector()->acknowledge(
-                consumer->getSessionInfo(),
-                consumer,
-                dynamic_cast<Message*>( message ),
-                Connector::DeliveredAck );
-
-            // Delete the message here
-            delete message;
-
-            throw ActiveMQException(__FILE__, __LINE__, "no dispatcher registered for consumer" );
-        }
-
-        // Dispatch the message.
-        DispatchData data( consumer, message );
-        dispatcher->dispatch( data );
+            // If we have no registered dispatcher, the consumer was probably
+            // just closed.  Just delete the message.
+            if( dispatcher == NULL ) {                
+                delete message;                
+            } else {
+    
+                // Dispatch the message.
+                DispatchData data( consumer, message );
+                dispatcher->dispatch( data );
+            }
+        }
     }
     catch( exceptions::ActiveMQException& ex )
     {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Sun Mar 18 12:40:52 2007
@@ -83,12 +83,6 @@
          * Maintain the set of all active sessions.
          */
         util::Set<ActiveMQSession*> activeSessions;
-        
-        /**
-         * If true, dispatch for all sessions will be asynchronous to the 
-         * transport.
-         */
-        bool alwaysSessionAsync;
 
     public:
 
@@ -210,7 +204,7 @@
         virtual void onConsumerMessage( connector::ConsumerInfo* consumer,
                                         core::ActiveMQMessage* message );
 
-    private:
+    public:
 
         /**
          * Notify the excpetion listener

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sun Mar 18 12:40:52 2007
@@ -41,8 +41,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
                                   const Properties& properties,
-                                  ActiveMQConnection* connection,
-                                  bool sessionAsyncDispatch )
+                                  ActiveMQConnection* connection )
 {
     if( sessionInfo == NULL || connection == NULL )
     {
@@ -56,7 +55,6 @@
     this->connection   = connection;
     this->closed       = false;
     this->asyncThread  = NULL;
-    this->sessionAsyncDispatch = sessionAsyncDispatch;
     this->useAsyncSend = Boolean::parseBoolean(
         properties.getProperty( "useAsyncSend", "false" ) );
 
@@ -90,6 +88,13 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::fire( exceptions::ActiveMQException& ex ) {
+    if( connection != NULL ) {
+        connection->fire( ex );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::close() throw ( cms::CMSException )
 {
     // If we're already close, just get outta' here.
@@ -717,6 +722,11 @@
 
         if( consumer != NULL )
         {
+            bool wasStarted = isStarted();
+            if( wasStarted ) {
+                stop();
+            }
+            
             // Remove the dispatcher for the Connection
             connection->removeDispatcher( consumer );
 
@@ -730,6 +740,10 @@
             // Remove this consumer from the consumers map.
             synchronized( &consumers ) {
                 consumers.remove( consumer->getConsumerId() );
+            }
+            
+            if( wasStarted ) {
+                start();
             }
         }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Sun Mar 18 12:40:52 2007
@@ -89,11 +89,6 @@
          * Is this Session using Async Sends.
          */
         bool useAsyncSend;
-        
-        /**
-         * Indicates whether or not dispatching should be done asynchronously.
-         */
-        bool sessionAsyncDispatch;
 
         /**
          * Outgoing Message Queue
@@ -106,18 +101,10 @@
 
         ActiveMQSession( connector::SessionInfo* sessionInfo,
                          const util::Properties& properties,
-                         ActiveMQConnection* connection,
-                         bool sessionAsyncDispatch );
+                         ActiveMQConnection* connection );
 
         virtual ~ActiveMQSession();
         
-        /**
-         * Indicates whether or not dispatching should be done asynchronously.
-         */
-        bool isSessionAsyncDispatch() const {
-            return sessionAsyncDispatch;
-        }
-        
         util::Map<long long, ActiveMQConsumer*>& getConsumers() {
             return consumers;
         }
@@ -143,6 +130,11 @@
          * state.
          */
         bool isStarted() const;
+        
+        /**
+         * Fires the given exception to the exception listener of the connection
+         */
+        void fire( exceptions::ActiveMQException& ex );
     
     public:  // Methods from ActiveMQMessageDispatcher
     

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?view=diff&rev=519679&r1=519678&r2=519679
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Sun Mar 18 12:40:52 2007
@@ -47,32 +47,20 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::execute( DispatchData& data ) {
         
-    // If dispatch async is off, just dispatch in the context of the transport
-    // thread.
-    if ( !session->isSessionAsyncDispatch() ){
-        dispatch(data);
-    }else {
-        
-        synchronized( &messageQueue ) {
-            messageQueue.push(data);
-            wakeup();
-        }
+    // Add the data to the queue.
+    synchronized( &messageQueue ) {
+        messageQueue.push(data);
+        wakeup();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
         
-    // If dispatch async is off, just dispatch in the context of the transport
-    // thread.
-    if ( !session->isSessionAsyncDispatch() && started ){
-        dispatch(data);
-    }else {
-        
-        synchronized( &messageQueue ) {
-            messageQueue.enqueueFront(data);
-            wakeup();
-        }
+    // Add the data to the front of the queue.
+    synchronized( &messageQueue ) {
+        messageQueue.enqueueFront(data);
+        wakeup();
     }
 }
     
@@ -83,7 +71,7 @@
         started = true;
         
         // Don't create the thread unless we need to.
-        if( session->isSessionAsyncDispatch() && thread == NULL ) {
+        if( thread == NULL ) {
             thread = new Thread( this );
             thread->start();
         }
@@ -126,19 +114,32 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::dispatch( DispatchData& data ) {
         
-    ActiveMQConsumer* consumer = NULL;
-    util::Map<long long, ActiveMQConsumer*>& consumers = session->getConsumers();
-    
-    synchronized(&consumers) {
-        consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
-    }
-    
-    if( consumer == NULL ) {
-        execute( data );
-    } else {
-        consumer->dispatch( data );
-    }
+    try {
+        ActiveMQConsumer* consumer = NULL;
+        util::Map<long long, ActiveMQConsumer*>& consumers = session->getConsumers();
+        
+        synchronized(&consumers) {
+            consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
+        }
+        
+        // If the consumer is not available, just delete the message.
+        // Otherwise, dispatch the message to the consumer.
+        if( consumer == NULL ) {
+            delete data.getMessage();
+        } else {
+            consumer->dispatch( data );
+        }
         
+    } catch( ActiveMQException& ex ) {
+        ex.setMark(__FILE__, __LINE__ );
+        ex.printStackTrace();
+    } catch( std::exception& ex ) {
+        ActiveMQException amqex( __FILE__, __LINE__, ex.what() );
+        amqex.printStackTrace();
+    } catch( ... ) {
+        ActiveMQException amqex( __FILE__, __LINE__, "caught unknown exception" );
+        amqex.printStackTrace();
+    }
 }
     
 ////////////////////////////////////////////////////////////////////////////////
@@ -162,10 +163,14 @@
         }
         
     } catch( ActiveMQException& ex ) {
-        ex.printStackTrace();
+        ex.setMark(__FILE__, __LINE__ );
+        session->fire( ex );
+    } catch( std::exception& stdex ) {
+        ActiveMQException ex( __FILE__, __LINE__, stdex.what() );
+        session->fire( ex );
     } catch( ... ) {
         ActiveMQException ex(__FILE__, __LINE__, "caught unknown exception" );
-        ex.printStackTrace();
+        session->fire( ex );
     }
 }
 
@@ -190,12 +195,8 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::wakeup() {
  
-    if( session->isSessionAsyncDispatch() ) {
-        synchronized( &messageQueue ) {
-            messageQueue.notifyAll();
-        }
-    } else if( started ){
-        dispatchAll();
+    synchronized( &messageQueue ) {
+        messageQueue.notifyAll();
     }
 }