You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/18 14:38:34 UTC

svn commit: r519611 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ test-integration/integration/connector/openwire/ test-integration/integration/connector/stomp/

Author: nmittler
Date: Sun Mar 18 06:38:33 2007
New Revision: 519611

URL: http://svn.apache.org/viewvc?view=rev&rev=519611
Log:
AMQCPP-81 - fixed distribution of messages already in queue

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=519611&r1=519610&r2=519611
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Sun Mar 18 06:38:33 2007
@@ -217,10 +217,17 @@
             fire( ex );
 
             return;
-        }
+        }        
 
-        // When not started we drop incomming messages
-        if( !started )
+        // Look up the dispatcher.
+        Dispatcher* dispatcher = NULL;
+        synchronized( &dispatchers )
+        {
+            dispatcher = dispatchers.getValue(consumer->getConsumerId());
+        }
+        
+        // If we have no registered dispatcher, this is bad!! (should never happen)
+        if( dispatcher == NULL )
         {
             // Indicate to Broker that we received the message, but it
             // was not consumed.
@@ -233,21 +240,12 @@
             // Delete the message here
             delete message;
 
-            return;
-        }
-
-        // Look up the dispatcher.
-        Dispatcher* dispatcher = NULL;
-        synchronized( &dispatchers )
-        {
-            dispatcher = dispatchers.getValue(consumer->getConsumerId());
+            throw ActiveMQException(__FILE__, __LINE__, "no dispatcher registered for consumer" );
         }
 
         // Dispatch the message.
-        if( dispatcher != NULL ) {
-            DispatchData data( consumer, message );
-            dispatcher->dispatch( data );
-        }
+        DispatchData data( consumer, message );
+        dispatcher->dispatch( data );
     }
     catch( exceptions::ActiveMQException& ex )
     {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?view=diff&rev=519611&r1=519610&r2=519611
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Sun Mar 18 06:38:33 2007
@@ -133,7 +133,9 @@
         consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
     }
     
-    if( consumer != NULL ) {
+    if( consumer == NULL ) {
+        execute( data );
+    } else {
         consumer->dispatch( data );
     }
         

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp?view=diff&rev=519611&r1=519610&r2=519611
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp Sun Mar 18 06:38:33 2007
@@ -408,3 +408,60 @@
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
 }
+
+void OpenwireSimpleTest::testReceiveAlreadyInQueue() {
+    
+        try
+    {
+        
+        if( IntegrationCommon::debug ) {
+            cout << "Starting activemqcms test (sending "
+                 << IntegrationCommon::defaultMsgCount
+                 << " messages per type and sleeping "
+                 << IntegrationCommon::defaultDelay 
+                 << " milli-seconds) ...\n"
+                 << endl;
+        }
+        
+        // Create CMS Object for Comms
+        cms::ConnectionFactory* factory = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat=openwire");
+        cms::Connection* connection = factory->createConnection();
+
+        cms::Session* session = connection->createSession();
+        
+        cms::Topic* topic = session->createTopic(Guid::createGUIDString());
+        
+        cms::MessageConsumer* consumer = session->createConsumer( topic );
+        
+        cms::MessageProducer* producer = session->createProducer( topic );
+
+        cms::TextMessage* textMsg = session->createTextMessage();
+        
+        // Send some text messages
+        producer->send( textMsg );
+        
+        delete textMsg;
+        
+        Thread::sleep( 100 );
+        
+        connection->start();
+
+        cms::Message* message = consumer->receive(1000);
+        CPPUNIT_ASSERT( message != NULL );
+        delete message;
+
+        if( IntegrationCommon::debug ) {
+            printf("Shutting Down\n" );
+        }
+        
+        connection->close();
+        
+        delete producer;                      
+        delete consumer;
+        delete topic;
+        delete session;
+        delete connection;
+        delete factory;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h?view=diff&rev=519611&r1=519610&r2=519611
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h Sun Mar 18 06:38:33 2007
@@ -36,6 +36,7 @@
         CPPUNIT_TEST( testSyncReceive );
         CPPUNIT_TEST( testMultipleConnections );
         CPPUNIT_TEST( testMultipleSessions );
+        CPPUNIT_TEST( testReceiveAlreadyInQueue );
         CPPUNIT_TEST_SUITE_END();
         
     public:
@@ -49,6 +50,7 @@
         virtual void testSyncReceive();
         virtual void testMultipleConnections();
         virtual void testMultipleSessions();
+        virtual void testReceiveAlreadyInQueue();
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp?view=diff&rev=519611&r1=519610&r2=519611
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp Sun Mar 18 06:38:33 2007
@@ -409,3 +409,59 @@
     AMQ_CATCH_RETHROW( ActiveMQException )
 }
 
+void SimpleTest::testReceiveAlreadyInQueue() {
+    
+        try
+    {
+        
+        if( IntegrationCommon::debug ) {
+            cout << "Starting activemqcms test (sending "
+                 << IntegrationCommon::defaultMsgCount
+                 << " messages per type and sleeping "
+                 << IntegrationCommon::defaultDelay 
+                 << " milli-seconds) ...\n"
+                 << endl;
+        }
+        
+        // Create CMS Object for Comms
+        cms::ConnectionFactory* factory = new ActiveMQConnectionFactory("tcp://localhost:61613?wireFormat=stomp");
+        cms::Connection* connection = factory->createConnection();
+
+        cms::Session* session = connection->createSession();
+        
+        cms::Topic* topic = session->createTopic(Guid::createGUIDString());
+        
+        cms::MessageConsumer* consumer = session->createConsumer( topic );
+        
+        cms::MessageProducer* producer = session->createProducer( topic );
+
+        cms::TextMessage* textMsg = session->createTextMessage();
+        
+        // Send some text messages
+        producer->send( textMsg );
+        
+        delete textMsg;
+        
+        Thread::sleep( 100 );
+        
+        connection->start();
+
+        cms::Message* message = consumer->receive(1000);
+        CPPUNIT_ASSERT( message != NULL );
+        delete message;
+
+        if( IntegrationCommon::debug ) {
+            printf("Shutting Down\n" );
+        }
+        
+        connection->close();
+        
+        delete producer;                      
+        delete consumer;
+        delete topic;
+        delete session;
+        delete connection;
+        delete factory;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h?view=diff&rev=519611&r1=519610&r2=519611
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h Sun Mar 18 06:38:33 2007
@@ -36,6 +36,7 @@
         CPPUNIT_TEST( testSyncReceive );
         CPPUNIT_TEST( testMultipleConnections );
         CPPUNIT_TEST( testMultipleSessions );
+        CPPUNIT_TEST( testReceiveAlreadyInQueue );
         CPPUNIT_TEST_SUITE_END();
         
     public:
@@ -49,6 +50,7 @@
         virtual void testSyncReceive();
         virtual void testMultipleConnections();
         virtual void testMultipleSessions();
+        virtual void testReceiveAlreadyInQueue();
 
     };