You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/21 09:00:14 UTC

svn commit: r520792 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/connector/stomp/ test/activemq/connector/stomp/

Author: nmittler
Date: Wed Mar 21 01:00:13 2007
New Revision: 520792

URL: http://svn.apache.org/viewvc?view=rev&rev=520792
Log:
AMQCPP-83 - fix to stomp connector for race condition when creating consumers

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp Wed Mar 21 01:00:13 2007
@@ -312,6 +312,20 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void StompConnector::startConsumer(ConsumerInfo* consumer )
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        return sessionManager->startConsumer(consumer);
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 ConsumerInfo* StompConnector::createDurableConsumer(
     const cms::Topic* topic,
     SessionInfo* session,

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h Wed Mar 21 01:00:13 2007
@@ -284,8 +284,8 @@
          * @param consumer - ConsumerInfo of a consumer that isn't started
          * @throws ConnectorException
          */
-        virtual void startConsumer( ConsumerInfo* consumer AMQCPP_UNUSED )
-            throw ( ConnectorException ) {}
+        virtual void startConsumer( ConsumerInfo* consumer )
+            throw ( ConnectorException );
 
         /**
          * Create a Consumer for the given Session

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h Wed Mar 21 01:00:13 2007
@@ -39,6 +39,11 @@
 
         // Session Info - We do not own this
         const SessionInfo* session;
+        
+        bool noLocal;
+        
+        // Subscription name for this consumer, "" for non-durables
+        std::string name;
 
     public:
 
@@ -48,6 +53,7 @@
             consumerId = 0;
             destination = NULL;
             session = NULL;
+            noLocal = false;
         }
 
         StompConsumerInfo( Connector* connector ) :
@@ -57,6 +63,7 @@
             consumerId = 0;
             destination = NULL;
             session = NULL;
+            noLocal = false;
         }
 
         virtual ~StompConsumerInfo() { 
@@ -126,6 +133,22 @@
          */
         virtual void setSessionInfo( const SessionInfo* session ) {
             this->session = session;
+        }
+        
+        virtual bool getNoLocal() const {
+            return noLocal;
+        }
+        
+        virtual void setNoLocal( bool noLocal ) {
+            this->noLocal = noLocal;
+        }
+        
+        virtual const std::string& getName() const {
+            return name;
+        }
+        
+        virtual void setName( const std::string& name ) {
+            this->name = name;
         }
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h Wed Mar 21 01:00:13 2007
@@ -61,7 +61,7 @@
          * @returns cloned copy of this object
          */
         virtual cms::Destination* clone() const {
-            return new StompQueue( getQueueName() );
+            return new StompQueue( this );
         }
 
     protected:

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp Wed Mar 21 01:00:13 2007
@@ -149,8 +149,38 @@
 {
     try
     {
+        // Initialize a new Consumer info Message
+        StompConsumerInfo* consumer = new StompConsumerInfo( connector );
+
+        consumer->setConsumerId( getNextConsumerId() );
+        consumer->setDestination( destination );
+        consumer->setMessageSelector( selector );
+        consumer->setSessionInfo( session );
+        consumer->setNoLocal( noLocal );
+        consumer->setName( name );
+
+        return consumer;
+    }
+    AMQ_CATCH_RETHROW( StompConnectorException )
+    AMQ_CATCHALL_THROW( StompConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompSessionManager::startConsumer(
+    connector::ConsumerInfo* consumer) throw ( StompConnectorException )
+{
+    try
+    {
+        StompConsumerInfo* stompConsumer = dynamic_cast<StompConsumerInfo*>(consumer);
+        if( stompConsumer == NULL ) {
+            throw StompConnectorException(__FILE__, __LINE__,
+                "ConsumerInfo is not of type created by this connector" );
+        }
+        
         synchronized( &mutex )
         {
+            const cms::Destination* destination = stompConsumer->getDestination();
+            
             // Find the right mapping to consumers
             ConsumerMap& consumerMap =
                 destinationMap[ destination->toProviderString() ];
@@ -162,20 +192,20 @@
                 // Send the request to the Broker
                 SubscribeCommand cmd;
 
-                if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE )
+                if( stompConsumer->getSessionInfo()->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE )
                 {
                     cmd.setAckMode( CommandConstants::ACK_CLIENT );
                 }
                 cmd.setDestination( destination->toProviderString() );
 
-                if( noLocal == true )
+                if( stompConsumer->getNoLocal() )
                 {
-                    cmd.setNoLocal( noLocal );
+                    cmd.setNoLocal( stompConsumer->getNoLocal() );
                 }
 
-                if( name != "" )
+                if( stompConsumer->getName() != "" )
                 {
-                    cmd.setSubscriptionName( name );
+                    cmd.setSubscriptionName( stompConsumer->getName() );
                 }
 
                 // Grab any options from the destination and set them
@@ -187,9 +217,9 @@
                 // that specifies a selector it will be ignored.  While
                 // this is not ideal, is the only way to handle the fact
                 // that activemq stomp doesn't support multiple sessions.
-                if( selector != "" )
+                if( stompConsumer->getMessageSelector() != "" )
                 {
-                    cmd.setMessageSelector( selector );
+                    cmd.setMessageSelector( stompConsumer->getMessageSelector() );
                 }
 
                 // Fire the message
@@ -202,22 +232,10 @@
                 }
             }
 
-            // Initialize a new Consumer info Message
-            ConsumerInfo* consumer = new StompConsumerInfo( connector );
-
-            consumer->setConsumerId( getNextConsumerId() );
-            consumer->setDestination( destination );
-            consumer->setMessageSelector( selector );
-            consumer->setSessionInfo( session );
-
             // Store this consumer for later message dispatching.
             consumerMap.insert(
-                make_pair( consumer->getConsumerId(), consumer ) );
-
-            return consumer;
+                make_pair( stompConsumer->getConsumerId(), stompConsumer ) );
         }
-
-        return NULL;
     }
     AMQ_CATCH_RETHROW( StompConnectorException )
     AMQ_CATCHALL_THROW( StompConnectorException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h Wed Mar 21 01:00:13 2007
@@ -140,6 +140,18 @@
             const std::string& selector = "",
             bool noLocal = false )
                 throw ( StompConnectorException );
+                
+        /**
+         * Given a valid Consumer info Object that was previously created
+         * by a call to <code>createConsumer</code>, the Consumer will be
+         * registered with the Broker, and be placed in a state in which
+         * it will now be able to receive messages.  All preperations
+         * for message receipt should be done before calling this method.
+         * @param consumer - ConsumerInfo of a consumer that isn't started
+         * @throws ConnectorException
+         */
+        virtual void startConsumer( connector::ConsumerInfo* consumer )
+                throw ( StompConnectorException );
 
         /**
          * Removes the Consumer from the session, will unsubscrive if the

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h Wed Mar 21 01:00:13 2007
@@ -62,7 +62,7 @@
          * @returns cloned copy of this object
          */
         virtual cms::Destination* clone() const {
-            return new StompTopic( getTopicName() );
+            return new StompTopic( this );
         }
 
     protected:

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h Wed Mar 21 01:00:13 2007
@@ -154,6 +154,7 @@
             std::string sel1 = "";
             StompTopic dest1( "dummy.topic.1" );
             ConsumerInfo* cinfo1 = connector->createConsumer( &dest1, info1, sel1 );
+            connector->startConsumer( cinfo1 );
             CPPUNIT_ASSERT( cinfo1->getSessionInfo() == info1 );
             CPPUNIT_ASSERT( cinfo1->getDestination()->toProviderString() == dest1.toProviderString() );
             CPPUNIT_ASSERT( cinfo1->getMessageSelector() == sel1 );
@@ -162,6 +163,7 @@
             std::string sel2 = "mysel2";
             StompTopic dest2( "dummy.topic.2" );
             ConsumerInfo* cinfo2 = connector->createConsumer( &dest2, info2, sel2 );
+            connector->startConsumer( cinfo2 );
             CPPUNIT_ASSERT( cinfo2->getSessionInfo() == info2 );
             CPPUNIT_ASSERT( cinfo2->getDestination()->toProviderString() == dest2.toProviderString() );
             CPPUNIT_ASSERT( cinfo2->getMessageSelector() == sel2 );
@@ -170,6 +172,7 @@
             std::string sel3 = "mysel3";
             StompQueue dest3( "dummy.queue.1" );
             ConsumerInfo* cinfo3 = connector->createConsumer( &dest3, info3, sel3 );
+            connector->startConsumer( cinfo3 );
             CPPUNIT_ASSERT( cinfo3->getSessionInfo() == info3 );
             CPPUNIT_ASSERT( cinfo3->getDestination()->toProviderString() == dest3.toProviderString() );
             CPPUNIT_ASSERT( cinfo3->getMessageSelector() == sel3 );
@@ -178,6 +181,7 @@
             std::string sel4 = "";
             StompTopic dest4( "dummy.queue.2" );
             ConsumerInfo* cinfo4 = connector->createConsumer( &dest4, info4, sel4 );
+            connector->startConsumer( cinfo4 );
             CPPUNIT_ASSERT( cinfo4->getSessionInfo() == info4 );
             CPPUNIT_ASSERT( cinfo4->getDestination()->toProviderString() == dest4.toProviderString() );
             CPPUNIT_ASSERT( cinfo4->getMessageSelector() == sel4 );
@@ -267,15 +271,19 @@
 
             SessionInfo* info1 = connector->createSession( cms::Session::AUTO_ACKNOWLEDGE );
             ConsumerInfo* cinfo1 = connector->createConsumer( &dest1, info1, "" );
+            connector->startConsumer( cinfo1 );
 
             SessionInfo* info2 = connector->createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
             ConsumerInfo* cinfo2 = connector->createConsumer( &dest1, info2, "" );
+            connector->startConsumer( cinfo2 );
 
             SessionInfo* info3 = connector->createSession( cms::Session::CLIENT_ACKNOWLEDGE );
             ConsumerInfo* cinfo3 = connector->createConsumer( &dest2, info3, "" );
+            connector->startConsumer( cinfo3 );
 
             SessionInfo* info4 = connector->createSession( cms::Session::SESSION_TRANSACTED );
             ConsumerInfo* cinfo4 = connector->createConsumer( &dest2, info4, "" );
+            connector->startConsumer( cinfo4 );
 
             MyMessageListener listener;
             connector->setConsumerMessageListener( &listener );
@@ -348,12 +356,14 @@
 
             SessionInfo* info1 = connector->createSession( cms::Session::AUTO_ACKNOWLEDGE );
             ConsumerInfo* cinfo1 = connector->createConsumer( &dest1, info1, "" );
+            connector->startConsumer( cinfo1 );
             CPPUNIT_ASSERT( cmdListener.cmd != NULL );
 
             cmdListener.cmd = NULL;
 
             SessionInfo* info2 = connector->createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
             ConsumerInfo* cinfo2 = connector->createConsumer( &dest1, info2, "" );
+            connector->startConsumer(cinfo2);
             CPPUNIT_ASSERT( cmdListener.cmd == NULL );
 
             cmdListener.cmd = NULL;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h Wed Mar 21 01:00:13 2007
@@ -159,6 +159,7 @@
             std::string sel1 = "";
             StompTopic dest1( "dummy.topic.1" );
             ConsumerInfo* cinfo1 = manager.createConsumer( &dest1, info1, sel1 );
+            manager.startConsumer( cinfo1 );
             CPPUNIT_ASSERT( cinfo1->getSessionInfo() == info1 );
             CPPUNIT_ASSERT( cinfo1->getDestination()->toProviderString() == dest1.toProviderString() );
             CPPUNIT_ASSERT( cinfo1->getMessageSelector() == sel1 );
@@ -167,6 +168,7 @@
             std::string sel2 = "mysel2";
             StompTopic dest2( "dummy.topic.2" );
             ConsumerInfo* cinfo2 = manager.createConsumer( &dest2, info2, sel2 );
+            manager.startConsumer( cinfo2 );
             CPPUNIT_ASSERT( cinfo2->getSessionInfo() == info2 );
             CPPUNIT_ASSERT( cinfo2->getDestination()->toProviderString() == dest2.toProviderString() );
             CPPUNIT_ASSERT( cinfo2->getMessageSelector() == sel2 );
@@ -175,6 +177,7 @@
             std::string sel3 = "mysel3";
             StompQueue dest3( "dummy.queue.1" );
             ConsumerInfo* cinfo3 = manager.createConsumer( &dest3, info3, sel3 );
+            manager.startConsumer( cinfo3 );
             CPPUNIT_ASSERT( cinfo3->getSessionInfo() == info3 );
             CPPUNIT_ASSERT( cinfo3->getDestination()->toProviderString() == dest3.toProviderString() );
             CPPUNIT_ASSERT( cinfo3->getMessageSelector() == sel3 );
@@ -183,6 +186,7 @@
             std::string sel4 = "";
             StompTopic dest4( "dummy.queue.2" );
             ConsumerInfo* cinfo4 = manager.createConsumer( &dest4, info4, sel4 );
+            manager.startConsumer( cinfo4 );
             CPPUNIT_ASSERT( cinfo4->getSessionInfo() == info4 );
             CPPUNIT_ASSERT( cinfo4->getDestination()->toProviderString() == dest4.toProviderString() );
             CPPUNIT_ASSERT( cinfo4->getMessageSelector() == sel4 );
@@ -217,15 +221,19 @@
 
             SessionInfo* info1 = manager.createSession( cms::Session::AUTO_ACKNOWLEDGE );
             ConsumerInfo* cinfo1 = manager.createConsumer( &dest1, info1, "" );
+            manager.startConsumer( cinfo1 );
 
             SessionInfo* info2 = manager.createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
             ConsumerInfo* cinfo2 = manager.createConsumer( &dest1, info2, "" );
+            manager.startConsumer( cinfo2 );
 
             SessionInfo* info3 = manager.createSession( cms::Session::CLIENT_ACKNOWLEDGE );
             ConsumerInfo* cinfo3 = manager.createConsumer( &dest2, info3, "" );
+            manager.startConsumer( cinfo3 );
 
             SessionInfo* info4 = manager.createSession( cms::Session::SESSION_TRANSACTED );
             ConsumerInfo* cinfo4 = manager.createConsumer( &dest2, info4, "" );
+            manager.startConsumer( cinfo4 );
 
             MyMessageListener listener;
             manager.setConsumerMessageListener( &listener );
@@ -287,12 +295,14 @@
 
             SessionInfo* info1 = manager.createSession( cms::Session::AUTO_ACKNOWLEDGE );
             ConsumerInfo* cinfo1 = manager.createConsumer( &dest1, info1, "" );
+            manager.startConsumer( cinfo1 );
             CPPUNIT_ASSERT( cmdListener.cmd != NULL );
 
             cmdListener.cmd = NULL;
 
             SessionInfo* info2 = manager.createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
             ConsumerInfo* cinfo2 = manager.createConsumer( &dest1, info2, "" );
+            manager.startConsumer( cinfo2 );
             CPPUNIT_ASSERT( cmdListener.cmd == NULL );
 
             cmdListener.cmd = NULL;
@@ -353,7 +363,9 @@
 
             cmdListener.expected.clear();
             StompTopic dest1( "dummy.topic.1" );
+            
             consumer = manager.createConsumer( &dest1, session, "" );
+            manager.startConsumer( consumer );
             CPPUNIT_ASSERT( consumer != NULL );
             CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
 
@@ -366,7 +378,9 @@
             cmdListener.expected.clear();
             cmdListener.expected.push_back( retroactive );
             StompTopic dest2( "dummy.topic.1?consumer.retroactive=true" );
+            
             consumer = manager.createConsumer( &dest2, session, "" );
+            manager.startConsumer( consumer );
             CPPUNIT_ASSERT( consumer != NULL );
             CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
 
@@ -395,7 +409,9 @@
                 "consumer.selector=" + selector.second + "&" +
                 "consumer.exclusive=" + exclusive.second + "&" +
                 "consumer.priority=" + priority.second );
+            
             consumer = manager.createConsumer( &dest3, session, "" );
+            manager.startConsumer( consumer );
             CPPUNIT_ASSERT( consumer != NULL );
             CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
 
@@ -423,10 +439,12 @@
                 "consumer.selector=" + selector.second + "&" +
                 "consumer.exclusive=" + exclusive.second + "&" +
                 "consumer.priority=" + priority.second );
+            
             consumer = manager.createConsumer( &dest4, session, "", true );
+            manager.startConsumer( consumer );
             CPPUNIT_ASSERT( consumer != NULL );
             CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
-
+            
             manager.removeConsumer( consumer );
             CPPUNIT_ASSERT( cmdListener.cmd != NULL );
             delete consumer;