You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/10 22:57:39 UTC

svn commit: r1491601 - in /activemq/activemq-cpp/branches/3.7.x: ./ activemq-cpp/src/main/activemq/core/ activemq-cpp/src/main/activemq/threads/ activemq-cpp/src/test/activemq/core/

Author: tabish
Date: Mon Jun 10 20:57:39 2013
New Revision: 1491601

URL: http://svn.apache.org/r1491601
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-487
fix and test for: https://issues.apache.org/jira/browse/AMQCPP-488

Modified:
    activemq/activemq-cpp/branches/3.7.x/   (props changed)
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h

Propchange: activemq/activemq-cpp/branches/3.7.x/
------------------------------------------------------------------------------
  Merged /activemq/activemq-cpp/trunk:r1491595-1491600

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1491601&r1=1491600&r2=1491601&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Jun 10 20:57:39 2013
@@ -955,8 +955,6 @@ void ActiveMQConnection::disconnect(long
                     e = ex;
                 }
             }
-
-            this->config->transport.reset(NULL);
         }
 
         // If we encountered an exception - throw the first one we encountered.

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp?rev=1491601&r1=1491600&r2=1491601&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp Mon Jun 10 20:57:39 2013
@@ -182,18 +182,18 @@ bool CompositeTaskRunner::iterate() {
 
     synchronized(&tasks) {
 
-        auto_ptr<Iterator<CompositeTask*> > iter(tasks.iterator());
-
-        while (iter->hasNext()) {
-
-            CompositeTask* task = iter->next();
+        for (int i = 0; i < tasks.size(); ++i) {
+            CompositeTask* task = tasks.pop();
 
             if (task->isPending()) {
                 task->iterate();
+                tasks.addLast(task);
 
                 // Always return true, so that we check again for any of
                 // the other tasks that might now be pending.
                 return true;
+            } else {
+                tasks.addLast(task);
             }
         }
     }

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp?rev=1491601&r1=1491600&r2=1491601&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp Mon Jun 10 20:57:39 2013
@@ -43,9 +43,9 @@ using namespace decaf::util;
 using namespace decaf::lang;
 
 namespace activemq {
-namespace core{
+namespace core {
 
-    class MyCommandListener : public transport::DefaultTransportListener{
+    class MyCommandListener: public transport::DefaultTransportListener {
     public:
 
         commands::Command* cmd;
@@ -53,47 +53,53 @@ namespace core{
     private:
 
         MyCommandListener(const MyCommandListener&);
-        MyCommandListener& operator= (const MyCommandListener&);
+        MyCommandListener& operator=(const MyCommandListener&);
 
     public:
 
-        MyCommandListener() : cmd(NULL) {}
-        virtual ~MyCommandListener(){}
+        MyCommandListener() : cmd(NULL) {
+        }
+        virtual ~MyCommandListener() {
+        }
 
-        virtual void onCommand( commands::Command* command ){
+        virtual void onCommand(commands::Command* command) {
             cmd = command;
         }
     };
 
-    class MyExceptionListener : public cms::ExceptionListener{
+    class MyExceptionListener: public cms::ExceptionListener {
     public:
 
         bool caughtOne;
 
     public:
 
-        MyExceptionListener() : caughtOne(false) {}
-        virtual ~MyExceptionListener(){}
+        MyExceptionListener() : caughtOne(false) {
+        }
+
+        virtual ~MyExceptionListener() {
+        }
 
-        virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED){
+        virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED) {
             caughtOne = true;
         }
     };
 
-    class MyDispatcher : public Dispatcher {
+    class MyDispatcher: public Dispatcher {
     public:
 
-        std::vector< decaf::lang::Pointer<commands::Message> > messages;
+        std::vector<decaf::lang::Pointer<commands::Message> > messages;
 
     public:
 
-        MyDispatcher() : messages() {}
-        virtual ~MyDispatcher(){}
+        MyDispatcher() :
+                messages() {
+        }
+        virtual ~MyDispatcher() {
+        }
 
-        virtual void dispatch( const decaf::lang::Pointer<commands::MessageDispatch>& data )
-            throw ( exceptions::ActiveMQException )
-        {
-            messages.push_back( data->getMessage() );
+        virtual void dispatch(const decaf::lang::Pointer<commands::MessageDispatch>& data) throw (exceptions::ActiveMQException) {
+            messages.push_back(data->getMessage());
         }
 
         virtual int getHashCode() const {
@@ -103,241 +109,104 @@ namespace core{
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQConnectionTest::test1WithStomp()
-//{
-//    try
-//    {
-//        MyMessageListener listener;
-//        MyExceptionListener exListener;
-//        MyCommandListener cmdListener;
-//        MyDispatcher msgListener;
-//        std::string connectionId = "testConnectionId";
-//        decaf::util::Properties* properties =
-//            new decaf::util::Properties();
-//        transport::Transport* transport = NULL;
-//
-//        // Default to Stomp
-//        properties->setProperty( "wireFormat", "stomp" );
-//        decaf::net::URI uri( "mock://mock?wireFormat=stomp" );
-//
-//        transport::TransportFactory* factory =
-//            transport::TransportRegistry::getInstance().findFactory( "mock" );
-//        if( factory == NULL ){
-//            CPPUNIT_ASSERT( false );
-//        }
-//
-//        // Create the transport.
-//        transport = factory->createComposite( uri );
-//        if( transport == NULL ){
-//            CPPUNIT_ASSERT( false );
-//        }
-//
-//        transport::mock::MockTransport* dTransport =
-//            dynamic_cast< transport::mock::MockTransport*>( transport );
-//
-//        CPPUNIT_ASSERT( dTransport != NULL );
-//
-//        dTransport->setCommandListener( &cmdListener );
-//
-//        connector::stomp::StompConnector* connector =
-//            new connector::stomp::StompConnector(
-//                transport, *properties );
-//
-//        connector->start();
-//
-//        ActiveMQConnection connection(
-//            new ActiveMQConnectionData(
-//                connector, transport, properties) );
-//
-//        // First - make sure exceptions are working.
-//        connection.setExceptionListener( &exListener );
-//        CPPUNIT_ASSERT( exListener.caughtOne == false );
-//        dTransport->fireException( exceptions::ActiveMQException( __FILE__, __LINE__, "test" ) );
-//        CPPUNIT_ASSERT( exListener.caughtOne == true );
-//
-//        cms::Session* session1 = connection.createSession();
-//        cms::Session* session2 = connection.createSession();
-//        cms::Session* session3 = connection.createSession();
-//
-//        CPPUNIT_ASSERT( session1 != NULL );
-//        CPPUNIT_ASSERT( session2 != NULL );
-//        CPPUNIT_ASSERT( session3 != NULL );
-//
-//        connector::stomp::StompSessionInfo session;
-//        connector::stomp::StompConsumerInfo consumer;
-//
-//        session.setSessionId( 1 );
-//        session.setConnectionId( "TEST:123" );
-//        session.setAckMode( cms::Session::AUTO_ACKNOWLEDGE );
-//
-//        connector::stomp::StompTopic myTopic( "test" );
-//        consumer.setConsumerId( 1 );
-//        consumer.setSessionInfo( &session );
-//        consumer.setDestination( &myTopic );
-//
-//        connection.addDispatcher( &consumer, &msgListener );
-//
-//        connector::stomp::commands::TextMessageCommand* cmd =
-//            new connector::stomp::commands::TextMessageCommand;
-//        connector::stomp::StompTopic topic1( "test" );
-//        cmd->setCMSDestination( &topic1 );
-//
-//        connector::ConsumerMessageListener* consumerListener =
-//            dynamic_cast< connector::ConsumerMessageListener* >(
-//                &connection );
-//
-//        connection.start();
-//
-//        CPPUNIT_ASSERT( consumerListener != NULL );
-//
-//        consumerListener->onConsumerMessage( &consumer, cmd );
-//
-//        CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener.messages.size() );
-//
-//        connection.removeDispatcher( &consumer );
-//
-//        msgListener.messages.clear();
-//        consumerListener->onConsumerMessage( &consumer, cmd );
-//
-//        CPPUNIT_ASSERT_EQUAL( 0, (int)msgListener.messages.size() );
-//
-//        connection.addDispatcher( &consumer, &msgListener );
-//
-//        connection.stop();
-//        consumerListener->onConsumerMessage( &consumer, cmd );
-//        connection.start();
-//        CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener.messages.size() );
-//
-//        delete cmd;
-//        cmd = new connector::stomp::commands::TextMessageCommand;
-//
-//        connector::stomp::StompTopic topic2( "test" );
-//        cmd->setCMSDestination( &topic2 );
-//
-//        consumerListener->onConsumerMessage( &consumer, cmd );
-//        CPPUNIT_ASSERT_EQUAL( 2, (int)msgListener.messages.size() );
-//
-//        connection.removeDispatcher( &consumer );
-//        msgListener.messages.clear();
-//
-//        session1->close();
-//        session2->close();
-//        session3->close();
-//        connection.close();
-//
-//        exListener.caughtOne = false;
-//        consumerListener->onConsumerMessage( &consumer, cmd );
-//        CPPUNIT_ASSERT( exListener.caughtOne == true );
-//
-//        delete cmd;
-//
-//        delete session1;
-//        delete session2;
-//        delete session3;
-//
-//    } catch( exceptions::ActiveMQException& ex ) {
-//        ex.printStackTrace();
-//        throw ex;
-//    }
-//}
-
-////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQConnectionTest::test2WithStomp()
-//{
-//    try
-//    {
-//        MyMessageListener listener;
-//        MyExceptionListener exListener;
-//        MyCommandListener cmdListener;
-//        MyDispatcher msgListener;
-//        std::string connectionId = "testConnectionId";
-//        decaf::util::Properties* properties =
-//            new decaf::util::Properties();
-//        transport::Transport* transport = NULL;
-//
-//        // Default to Stomp
-//        properties->setProperty( "wireFormat", "stomp" );
-//        decaf::net::URI uri( "mock://mock?wireFormat=stomp" );
-//
-//        transport::TransportFactory* factory =
-//            transport::TransportRegistry::getInstance().findFactory( "mock" );
-//        if( factory == NULL ){
-//            CPPUNIT_ASSERT( false );
-//        }
-//
-//        // Create the transport.
-//        transport = factory->createComposite( uri );
-//        if( transport == NULL ){
-//            CPPUNIT_ASSERT( false );
-//        }
-//
-//        transport::mock::MockTransport* dTransport =
-//            dynamic_cast< transport::mock::MockTransport*>( transport );
-//
-//        CPPUNIT_ASSERT( dTransport != NULL );
-//
-//        dTransport->setCommandListener( &cmdListener );
-//
-//        connector::stomp::StompConnector* connector =
-//            new connector::stomp::StompConnector(
-//                transport, *properties );
-//
-//        connector->start();
-//
-//        ActiveMQConnection connection(
-//            new ActiveMQConnectionData(
-//                connector, transport, properties) );
-//
-//        connection.getClientID();
-//        connection.close();
-//
-//        CPPUNIT_ASSERT( connection.getClientID() == "" );
-//
-//    } catch( exceptions::ActiveMQException& ex ) {
-//        ex.printStackTrace();
-//        throw ex;
-//    }
-//}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnectionTest::test2WithOpenwire()
-{
-    try
-    {
+void ActiveMQConnectionTest::test2WithOpenwire() {
+    try {
         MyExceptionListener exListener;
         MyCommandListener cmdListener;
         MyDispatcher msgListener;
         std::string connectionId = "testConnectionId";
-        Pointer<decaf::util::Properties> properties( new decaf::util::Properties() );
+        Pointer<decaf::util::Properties> properties(new decaf::util::Properties());
         Pointer<Transport> transport;
 
-        // Default to Stomp
-        properties->setProperty( "wireFormat", "openwire" );
-        decaf::net::URI uri( "mock://mock?wireFormat=openwire" );
-
-        transport::TransportFactory* factory =
-            transport::TransportRegistry::getInstance().findFactory( "mock" );
-        if( factory == NULL ){
-            CPPUNIT_ASSERT( false );
+        properties->setProperty("wireFormat", "openwire");
+        decaf::net::URI uri("mock://mock?wireFormat=openwire");
+
+        transport::TransportFactory* factory = transport::TransportRegistry::getInstance().findFactory("mock");
+        if (factory == NULL) {
+            CPPUNIT_ASSERT(false);
         }
 
         // Create the transport.
-        transport = factory->createComposite( uri );
-        if( transport == NULL ){
-            CPPUNIT_ASSERT( false );
+        transport = factory->createComposite(uri);
+        if (transport == NULL) {
+            CPPUNIT_ASSERT(false);
         }
 
-        transport->setTransportListener( &cmdListener );
+        transport->setTransportListener(&cmdListener);
 
-        ActiveMQConnection connection( transport, properties );
+        ActiveMQConnection connection(transport, properties);
 
         connection.getClientID();
         connection.close();
 
-        CPPUNIT_ASSERT( connection.getClientID() == "" );
+        CPPUNIT_ASSERT(connection.getClientID() == "");
 
-    } catch( exceptions::ActiveMQException& ex ) {
+    } catch (exceptions::ActiveMQException& ex) {
         ex.printStackTrace();
         throw ex;
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestCloseCancelsHungStartRunnable: public Runnable {
+    private:
+
+        TestCloseCancelsHungStartRunnable(const TestCloseCancelsHungStartRunnable&);
+        TestCloseCancelsHungStartRunnable& operator=(const TestCloseCancelsHungStartRunnable);
+
+        std::auto_ptr<cms::Connection> connection;
+
+    public:
+
+        TestCloseCancelsHungStartRunnable() : connection(NULL) {
+        }
+
+        virtual ~TestCloseCancelsHungStartRunnable() {
+            try {
+                connection.reset(NULL);
+            } catch (...) {
+            }
+        }
+
+        cms::Connection* getConnection() const {
+            return this->connection.get();
+        }
+
+        virtual void run() {
+            try {
+                std::auto_ptr<ActiveMQConnectionFactory> factory(new ActiveMQConnectionFactory("failover://(tcp://123.132.0.1:61616)"));
+
+                connection.reset(factory->createConnection());
+                connection->start();
+            } catch (...) {
+            }
+        }
+    };
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionTest::testCloseCancelsHungStart() {
+
+    TestCloseCancelsHungStartRunnable runnable;
+
+    Thread runner(&runnable);
+    runner.start();
+
+    while (runnable.getConnection() == NULL) {
+        Thread::sleep(100);
+    }
+
+    runner.join(1000);
+    CPPUNIT_ASSERT(runner.isAlive());
+
+    try {
+        runnable.getConnection()->close();
+    } catch (...) {
+    }
+
+    runner.join(2000);
+    CPPUNIT_ASSERT(!runner.isAlive());
+}

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h?rev=1491601&r1=1491600&r2=1491601&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h Mon Jun 10 20:57:39 2013
@@ -21,25 +21,23 @@
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
 
-namespace activemq{
-namespace core{
+namespace activemq {
+namespace core {
+
+    class ActiveMQConnectionTest : public CppUnit::TestFixture {
 
-    class ActiveMQConnectionTest : public CppUnit::TestFixture
-    {
         CPPUNIT_TEST_SUITE( ActiveMQConnectionTest );
-//        CPPUNIT_TEST( test1WithStomp );
-//        CPPUNIT_TEST( test2WithStomp );
         CPPUNIT_TEST( test2WithOpenwire );
+        CPPUNIT_TEST( testCloseCancelsHungStart );
         CPPUNIT_TEST_SUITE_END();
 
     public:
 
-        ActiveMQConnectionTest() {};
+        ActiveMQConnectionTest() {}
         virtual ~ActiveMQConnectionTest() {}
 
-//        void test1WithStomp();
-//        void test2WithStomp();
         void test2WithOpenwire();
+        void testCloseCancelsHungStart();
 
     };